http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java 
b/src/java/org/apache/cassandra/security/SSLFactory.java
index 33c1ad6..3c1293f 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -17,63 +17,67 @@
  */
 package org.apache.cassandra.security;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.io.InputStream;
+
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.Enumeration;
 import java.util.List;
-
+import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
-import javax.net.ssl.SSLServerSocket;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
 
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.io.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.config.EncryptionOptions;
 
 /**
- * A Factory for providing and setting up Client and Server SSL wrapped
- * Socket and ServerSocket
+ * A Factory for providing and setting up client {@link SSLSocket}s. Also 
provides
+ * methods for creating both JSSE {@link SSLContext} instances as well as 
netty {@link SslContext} instances.
+ *
+ * Netty {@link SslContext} instances are expensive to create (as well as to 
destroy) and consume a lof of resources
+ * (especially direct memory), but instances can be reused across connections 
(assuming the SSL params are the same).
+ * Hence we cache created instances in {@link #clientSslContext} and {@link 
#serverSslContext}.
  */
 public final class SSLFactory
 {
     private static final Logger logger = 
LoggerFactory.getLogger(SSLFactory.class);
-    private static boolean checkedExpiry = false;
 
-    public static SSLServerSocket getServerSocket(EncryptionOptions options, 
InetAddress address, int port) throws IOException
-    {
-        SSLContext ctx = createSSLContext(options, true);
-        SSLServerSocket serverSocket = 
(SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
-        try
-        {
-            serverSocket.setReuseAddress(true);
-            prepareSocket(serverSocket, options);
-            serverSocket.bind(new InetSocketAddress(address, port), 500);
-            return serverSocket;
-        }
-        catch (IllegalArgumentException | SecurityException | IOException e)
-        {
-            serverSocket.close();
-            throw e;
-        }
-    }
+    @VisibleForTesting
+    static volatile boolean checkedExpiry = false;
+
+    /**
+     * A cached reference of the {@link SslContext} for client-facing 
connections.
+     */
+    private static final AtomicReference<SslContext> clientSslContext = new 
AtomicReference<>();
+
+    /**
+     * A cached reference of the {@link SslContext} for peer-to-peer, 
internode messaging connections.
+     */
+    private static final AtomicReference<SslContext> serverSslContext = new 
AtomicReference<>();
 
     /** Create a socket and connect */
     public static SSLSocket getSocket(EncryptionOptions options, InetAddress 
address, int port, InetAddress localAddress, int localPort) throws IOException
@@ -109,37 +113,6 @@ public final class SSLFactory
         }
     }
 
-    /** Just create a socket */
-    public static SSLSocket getSocket(EncryptionOptions options) throws 
IOException
-    {
-        SSLContext ctx = createSSLContext(options, true);
-        SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket();
-        try
-        {
-            prepareSocket(socket, options);
-            return socket;
-        }
-        catch (IllegalArgumentException e)
-        {
-            socket.close();
-            throw e;
-        }
-    }
-
-    /** Sets relevant socket options specified in encryption settings */
-    private static void prepareSocket(SSLServerSocket serverSocket, 
EncryptionOptions options)
-    {
-        String[] suites = 
filterCipherSuites(serverSocket.getSupportedCipherSuites(), 
options.cipher_suites);
-        if(options.require_endpoint_verification)
-        {
-            SSLParameters sslParameters = serverSocket.getSSLParameters();
-            sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
-            serverSocket.setSSLParameters(sslParameters);
-        }
-        serverSocket.setEnabledCipherSuites(suites);
-        serverSocket.setNeedClientAuth(options.require_client_auth);
-    }
-
     /** Sets relevant socket options specified in encryption settings */
     private static void prepareSocket(SSLSocket socket, EncryptionOptions 
options)
     {
@@ -153,28 +126,50 @@ public final class SSLFactory
         socket.setEnabledCipherSuites(suites);
     }
 
+    /**
+     * Create a JSSE {@link SSLContext}.
+     */
     @SuppressWarnings("resource")
     public static SSLContext createSSLContext(EncryptionOptions options, 
boolean buildTruststore) throws IOException
     {
-        InputStream tsf = null;
-        InputStream ksf = null;
-        SSLContext ctx;
+        TrustManager[] trustManagers = null;
+        if (buildTruststore)
+            trustManagers = 
buildTrustManagerFactory(options).getTrustManagers();
+
+        KeyManagerFactory kmf = buildKeyManagerFactory(options);
+
         try
         {
-            ctx = SSLContext.getInstance(options.protocol);
-            TrustManager[] trustManagers = null;
+            SSLContext ctx = SSLContext.getInstance(options.protocol);
+            ctx.init(kmf.getKeyManagers(), trustManagers, null);
+            return ctx;
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Error creating/initializing the SSL 
Context", e);
+        }
+    }
 
-            if(buildTruststore)
-            {
-                tsf = Files.newInputStream(Paths.get(options.truststore));
-                TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(options.algorithm);
-                KeyStore ts = KeyStore.getInstance(options.store_type);
-                ts.load(tsf, options.truststore_password.toCharArray());
-                tmf.init(ts);
-                trustManagers = tmf.getTrustManagers();
-            }
+    static TrustManagerFactory buildTrustManagerFactory(EncryptionOptions 
options) throws IOException
+    {
+        try (InputStream tsf = 
Files.newInputStream(Paths.get(options.truststore)))
+        {
+            TrustManagerFactory tmf = 
TrustManagerFactory.getInstance(options.algorithm);
+            KeyStore ts = KeyStore.getInstance(options.store_type);
+            ts.load(tsf, options.truststore_password.toCharArray());
+            tmf.init(ts);
+            return tmf;
+        }
+        catch (Exception e)
+        {
+            throw new IOException("failed to build trust manager store for 
secure connections", e);
+        }
+    }
 
-            ksf = Files.newInputStream(Paths.get((options.keystore)));
+    static KeyManagerFactory buildKeyManagerFactory(EncryptionOptions options) 
throws IOException
+    {
+        try (InputStream ksf = 
Files.newInputStream(Paths.get(options.keystore)))
+        {
             KeyManagerFactory kmf = 
KeyManagerFactory.getInstance(options.algorithm);
             KeyStore ks = KeyStore.getInstance(options.store_type);
             ks.load(ksf, options.keystore_password.toCharArray());
@@ -193,20 +188,12 @@ public final class SSLFactory
                 checkedExpiry = true;
             }
             kmf.init(ks, options.keystore_password.toCharArray());
-
-            ctx.init(kmf.getKeyManagers(), trustManagers, null);
-
+            return kmf;
         }
         catch (Exception e)
         {
-            throw new IOException("Error creating the initializing the SSL 
Context", e);
+            throw new IOException("failed to build trust manager store for 
secure connections", e);
         }
-        finally
-        {
-            FileUtils.closeQuietly(tsf);
-            FileUtils.closeQuietly(ksf);
-        }
-        return ctx;
     }
 
     public static String[] filterCipherSuites(String[] supported, String[] 
desired)
@@ -223,4 +210,65 @@ public final class SSLFactory
         }
         return ret;
     }
+
+    /**
+     * get a netty {@link SslContext} instance
+     */
+    public static SslContext getSslContext(EncryptionOptions options, boolean 
buildTruststore, boolean forServer) throws IOException
+    {
+        return getSslContext(options, buildTruststore, forServer, 
OpenSsl.isAvailable());
+    }
+
+    /**
+     * Get a netty {@link SslContext} instance.
+     */
+    @VisibleForTesting
+    static SslContext getSslContext(EncryptionOptions options, boolean 
buildTruststore, boolean forServer, boolean useOpenSsl) throws IOException
+    {
+        if (forServer && serverSslContext.get() != null)
+            return serverSslContext.get();
+        if (!forServer && clientSslContext.get() != null)
+            return clientSslContext.get();
+
+        /*
+            There is a case where the netty/openssl combo might not support 
using KeyManagerFactory. specifically,
+            I've seen this with the netty-tcnative dynamic openssl 
implementation. using the netty-tcnative static-boringssl
+            works fine with KeyManagerFactory. If we want to support all of 
the netty-tcnative options, we would need
+            to fall back to passing in a file reference for both a x509 and 
PKCS#8 private key file in PEM format (see
+            {@link SslContextBuilder#forServer(File, File, String)}). However, 
we are not supporting that now to keep
+            the config/yaml API simple.
+         */
+        KeyManagerFactory kmf = null;
+        if (forServer || options.require_client_auth)
+            kmf = buildKeyManagerFactory(options);
+
+        SslContextBuilder builder;
+        if (forServer)
+        {
+            builder = SslContextBuilder.forServer(kmf);
+            builder.clientAuth(options.require_client_auth ? 
ClientAuth.REQUIRE : ClientAuth.NONE);
+        }
+        else
+        {
+            builder = SslContextBuilder.forClient().keyManager(kmf);
+        }
+
+        builder.sslProvider(useOpenSsl ? SslProvider.OPENSSL : 
SslProvider.JDK);
+
+        // only set the cipher suites if the opertor has explicity configured 
values for it; else, use the default
+        // for each ssl implemention (jdk or openssl)
+        if (options.cipher_suites != null && options.cipher_suites.length > 0)
+            builder.ciphers(Arrays.asList(options.cipher_suites), 
SupportedCipherSuiteFilter.INSTANCE);
+
+        if (buildTruststore)
+            builder.trustManager(buildTrustManagerFactory(options));
+
+        SslContext ctx = builder.build();
+        AtomicReference<SslContext> ref = forServer ? serverSslContext : 
clientSslContext;
+        if (ref.compareAndSet(null, ctx))
+            return ctx;
+
+        ReferenceCountUtil.release(ctx);
+        return ref.get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java 
b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index a2ad66c..d88d63c 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.streaming;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.nio.channels.SocketChannel;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class DefaultConnectionFactory implements StreamConnectionFactory
 {
@@ -47,20 +52,15 @@ public class DefaultConnectionFactory implements 
StreamConnectionFactory
         int attempts = 0;
         while (true)
         {
-            Socket socket = null;
             try
             {
-                socket = OutboundTcpConnectionPool.newSocket(peer);
+                Socket socket = newSocket(peer);
                 
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
                 socket.setKeepAlive(true);
                 return socket;
             }
             catch (IOException e)
             {
-                if (socket != null)
-                {
-                    socket.close();
-                }
                 if (++attempts >= MAX_CONNECT_ATTEMPTS)
                     throw e;
 
@@ -77,4 +77,21 @@ public class DefaultConnectionFactory implements 
StreamConnectionFactory
             }
         }
     }
+
+    // TODO this is deliberately copied from (the now former) 
OutboundTcpConnectionPool, for CASSANDRA-8457.
+    // to be replaced in CASSANDRA-12229 (make streaming use 8457)
+    public static Socket newSocket(InetAddress endpoint) throws IOException
+    {
+        // zero means 'bind on any available port.'
+        if (MessagingService.isEncryptedConnection(endpoint))
+        {
+            return 
SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, 
DatabaseDescriptor.getSSLStoragePort());
+        }
+        else
+        {
+            SocketChannel channel = SocketChannel.open();
+            channel.connect(new InetSocketAddress(endpoint, 
DatabaseDescriptor.getStoragePort()));
+            return channel.socket();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java 
b/src/java/org/apache/cassandra/tracing/Tracing.java
index 33e1967..4cdddba 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -316,8 +316,7 @@ public abstract class Tracing implements 
ExecutorLocal<TraceState>
     }
 
     /**
-     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
-     * that are not initiated by local node == coordinator).
+     * Called for non-local traces (traces that are not initiated by local 
node == coordinator).
      */
     public abstract void trace(ByteBuffer sessionId, String message, int ttl);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/tracing/TracingImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TracingImpl.java 
b/src/java/org/apache/cassandra/tracing/TracingImpl.java
index d774abb..789216e 100644
--- a/src/java/org/apache/cassandra/tracing/TracingImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TracingImpl.java
@@ -99,8 +99,7 @@ class TracingImpl extends Tracing
     }
 
     /**
-     * Called from {@link org.apache.cassandra.net.OutboundTcpConnection} for 
non-local traces (traces
-     * that are not initiated by local node == coordinator).
+     * Called for non-local traces (traces that are not initiated by local 
node == coordinator).
      */
     public void trace(final ByteBuffer sessionId, final String message, final 
int ttl)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 881ee81..1afe910 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -610,7 +610,9 @@ public abstract class Message
                 message = "Unexpected exception during request; channel = 
<unprintable>";
             }
 
-            if (!alwaysLogAtError && exception instanceof IOException)
+            // netty wraps SSL errors in a CodecExcpetion
+            boolean isIOException = exception instanceof IOException || 
(exception.getCause() instanceof IOException);
+            if (!alwaysLogAtError && isIOException)
             {
                 if (ioExceptionsAtDebugLevel.contains(exception.getMessage()))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 28f99e8..9408a3a 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.*;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.epoll.EpollServerSocketChannel;
@@ -40,6 +41,7 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Version;
 import io.netty.util.concurrent.EventExecutor;
@@ -343,31 +345,18 @@ public class Server implements CassandraDaemon.Server
 
     protected abstract static class AbstractSecureIntializer extends 
Initializer
     {
-        private final SSLContext sslContext;
         private final EncryptionOptions encryptionOptions;
 
         protected AbstractSecureIntializer(Server server, EncryptionOptions 
encryptionOptions)
         {
             super(server);
             this.encryptionOptions = encryptionOptions;
-            try
-            {
-                this.sslContext = 
SSLFactory.createSSLContext(encryptionOptions, 
encryptionOptions.require_client_auth);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException("Failed to setup secure pipeline", 
e);
-            }
         }
 
-        protected final SslHandler createSslHandler()
+        protected final SslHandler createSslHandler(ByteBufAllocator 
allocator) throws IOException
         {
-            SSLEngine sslEngine = sslContext.createSSLEngine();
-            sslEngine.setUseClientMode(false);
-            String[] suites = 
SSLFactory.filterCipherSuites(sslEngine.getSupportedCipherSuites(), 
encryptionOptions.cipher_suites);
-            sslEngine.setEnabledCipherSuites(suites);
-            sslEngine.setNeedClientAuth(encryptionOptions.require_client_auth);
-            return new SslHandler(sslEngine);
+            SslContext sslContext = 
SSLFactory.getSslContext(encryptionOptions, 
encryptionOptions.require_client_auth, true);
+            return sslContext.newHandler(allocator);
         }
     }
 
@@ -396,7 +385,7 @@ public class Server implements CassandraDaemon.Server
                     {
                         // Connection uses SSL/TLS, replace the detection 
handler with a SslHandler and so use
                         // encryption.
-                        SslHandler sslHandler = createSslHandler();
+                        SslHandler sslHandler = 
createSslHandler(channel.alloc());
                         channelHandlerContext.pipeline().replace(this, "ssl", 
sslHandler);
                     }
                     else
@@ -419,7 +408,7 @@ public class Server implements CassandraDaemon.Server
 
         protected void initChannel(Channel channel) throws Exception
         {
-            SslHandler sslHandler = createSslHandler();
+            SslHandler sslHandler = createSslHandler(channel.alloc());
             super.initChannel(channel);
             channel.pipeline().addFirst("ssl", sslHandler);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 13cd9bd..c72d6e9 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -28,8 +28,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +38,7 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.ssl.SslContext;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -58,7 +57,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.handler.ssl.SslHandler;
 import static 
org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
 public class SimpleClient implements Closeable
@@ -287,21 +285,11 @@ public class SimpleClient implements Closeable
 
     private class SecureInitializer extends Initializer
     {
-        private final SSLContext sslContext;
-
-        public SecureInitializer() throws IOException
-        {
-            this.sslContext = SSLFactory.createSSLContext(encryptionOptions, 
true);
-        }
-
         protected void initChannel(Channel channel) throws Exception
         {
             super.initChannel(channel);
-            SSLEngine sslEngine = sslContext.createSSLEngine();
-            sslEngine.setUseClientMode(true);
-            String[] suites = 
SSLFactory.filterCipherSuites(sslEngine.getSupportedCipherSuites(), 
encryptionOptions.cipher_suites);
-            sslEngine.setEnabledCipherSuites(suites);
-            channel.pipeline().addFirst("ssl", new SslHandler(sslEngine));
+            SslContext sslContext = 
SSLFactory.getSslContext(encryptionOptions, 
encryptionOptions.require_client_auth, true);
+            channel.pipeline().addFirst("ssl", 
sslContext.newHandler(channel.alloc()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java 
b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 9f3b118..6ff91e3 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.utils;
 
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.FileUtils;
-
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.RandomAccessFile;
@@ -32,19 +29,17 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel.MapMode;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
 import java.util.Locale;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
+/**
+ * Groups strategies to coalesce messages.
+ */
 public class CoalescingStrategies
 {
-    static protected final Logger logger = 
LoggerFactory.getLogger(CoalescingStrategies.class);
-
     /*
      * Log debug information at info level about what the average is and when 
coalescing is enabled/disabled
      */
@@ -54,6 +49,8 @@ public class CoalescingStrategies
     private static final String DEBUG_COALESCING_PATH_PROPERTY = 
Config.PROPERTY_PREFIX + "coalescing_debug_path";
     private static final String DEBUG_COALESCING_PATH = 
System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug");
 
+    public enum Strategy { MOVINGAVERAGE, FIXED, TIMEHORIZON, DISABLED }
+
     static
     {
         if (DEBUG_COALESCING)
@@ -68,98 +65,83 @@ public class CoalescingStrategies
         }
     }
 
-    @VisibleForTesting
-    interface Clock
-    {
-        long nanoTime();
-    }
-
-    @VisibleForTesting
-    static Clock CLOCK = new Clock()
-    {
-        public long nanoTime()
-        {
-            return System.nanoTime();
-        }
-    };
-
     public static interface Coalescable
     {
         long timestampNanos();
     }
 
     @VisibleForTesting
-    static void parkLoop(long nanos)
-    {
-        long now = System.nanoTime();
-        final long timer = now + nanos;
-        // We shouldn't loop if it's within a few % of the target sleep time 
if on a second iteration.
-        // See CASSANDRA-8692.
-        final long limit = timer - nanos / 16;
-        do
-        {
-            LockSupport.parkNanos(timer - now);
-            now = System.nanoTime();
-        }
-        while (now < limit);
-    }
-
-    private static boolean maybeSleep(int messages, long averageGap, long 
maxCoalesceWindow, Parker parker)
+    static long determineCoalescingTime(long averageGap, long 
maxCoalesceWindow)
     {
-        // Do not sleep if there are still items in the backlog 
(CASSANDRA-13090).
-        if (messages >= 
DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages())
-            return false;
+        // Don't bother waiting at all if we're unlikely to get any new 
message within our max window
+        if (averageGap > maxCoalesceWindow)
+            return -1;
 
-        // only sleep if we can expect to double the number of messages we're 
sending in the time interval
-        long sleep = messages * averageGap;
-        if (sleep <= 0 || sleep > maxCoalesceWindow)
-            return false;
+        // avoid the degenerate case of zero (very unlikely, but let's be safe)
+        if (averageGap <= 0)
+            return maxCoalesceWindow;
 
         // assume we receive as many messages as we expect; apply the same 
logic to the future batch:
         // expect twice as many messages to consider sleeping for "another" 
interval; this basically translates
-        // to doubling our sleep period until we exceed our max sleep window
+        // to doubling our sleep period until we exceed our max sleep window.
+        long sleep = averageGap;
         while (sleep * 2 < maxCoalesceWindow)
             sleep *= 2;
-        parker.park(sleep);
-        return true;
+        return sleep;
     }
 
-    public static abstract class CoalescingStrategy
+    /**
+     * A coalescing strategy, that decides when to coalesce messages.
+     * <p>
+     * The general principle is that, when asked, the strategy returns the 
time delay we want to wait for more messages
+     * to arrive before sending so message can be coalesced. For that, the 
strategy must be fed new messages through
+     * the {@link #newArrival(Coalescable)} method (the only assumption we 
make on messages is that they have an associated
+     * timestamp). The strategy can then be queried for the time to wait for 
coalescing through
+     * {@link #currentCoalescingTimeNanos()}.
+     * <p>
+     * Note that it is expected that a call {@link 
#currentCoalescingTimeNanos()} will come just after a call to
+     * {@link #newArrival(Coalescable))}, as the intent of the value returned 
by the former method is "Given a new message, how much
+     * time should I wait for more messages to arrive and be coalesced with 
that message". But both calls are separated
+     * as one may not want to call {@link #currentCoalescingTimeNanos()} after 
every call to {@link #newArrival(Coalescable)}
+     * and we thus save processing. How arrivals influence the coalescing time 
is however entirely up to the strategy and some
+     * strategy may ignore arrivals completely and return a constant 
coalescing time.
+     */
+    public interface CoalescingStrategy
+    {
+        /**
+         * Inform the strategy of a new message to consider.
+         *
+         * @param message the message to consider.
+         */
+        void newArrival(Coalescable message);
+
+        /**
+         * The current time to wait for the purpose of coalescing messages.
+         *
+         * @return the coalescing time. A negative value can be returned if no 
coalescing should be done (which can be a
+         * transient thing).
+         */
+        long currentCoalescingTimeNanos();
+    }
+
+    public static abstract class AbstractCoalescingStrategy implements 
CoalescingStrategy
     {
-        protected final Parker parker;
         protected final Logger logger;
         protected volatile boolean shouldLogAverage = false;
         protected final ByteBuffer logBuffer;
         private RandomAccessFile ras;
         private final String displayName;
 
-        protected CoalescingStrategy(Parker parker, Logger logger, String 
displayName)
+        protected AbstractCoalescingStrategy(Logger logger, String displayName)
         {
-            this.parker = parker;
             this.logger = logger;
             this.displayName = displayName;
-            if (DEBUG_COALESCING)
-            {
-                NamedThreadFactory.createThread(() ->
-                {
-                    while (true)
-                    {
-                        try
-                        {
-                            Thread.sleep(5000);
-                        }
-                        catch (InterruptedException e)
-                        {
-                            throw new AssertionError();
-                        }
-                        shouldLogAverage = true;
-                    }
-                }, displayName + " debug thread").start();
-            }
+
             RandomAccessFile rasTemp = null;
             ByteBuffer logBufferTemp = null;
             if (DEBUG_COALESCING)
             {
+                
ScheduledExecutors.scheduledFastTasks.scheduleWithFixedDelay(() -> 
shouldLogAverage = true, 5, 5, TimeUnit.SECONDS);
                 try
                 {
                     File outFile = File.createTempFile("coalescing_" + 
this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH));
@@ -214,44 +196,10 @@ public class CoalescingStrategies
                 }
             }
         }
-
-        /**
-         * Drain from the input blocking queue to the output list up to 
maxItems elements.
-         *
-         * The coalescing strategy may choose to park the current thread if it 
thinks it will
-         * be able to produce an output list with more elements.
-         *
-         * @param input Blocking queue to retrieve elements from
-         * @param out Output list to place retrieved elements in. Must be 
empty.
-         * @param maxItems Maximum number of elements to place in the output 
list
-         */
-        public <C extends Coalescable> void coalesce(BlockingQueue<C> input, 
List<C> out, int maxItems) throws InterruptedException
-        {
-            Preconditions.checkArgument(out.isEmpty(), "out list should be 
empty");
-            coalesceInternal(input, out, maxItems);
-        }
-
-        protected abstract <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws 
InterruptedException;
-
     }
 
     @VisibleForTesting
-    interface Parker
-    {
-        void park(long nanos);
-    }
-
-    private static final Parker PARKER = new Parker()
-    {
-        @Override
-        public void park(long nanos)
-        {
-            parkLoop(nanos);
-        }
-    };
-
-    @VisibleForTesting
-    static class TimeHorizonMovingAverageCoalescingStrategy extends 
CoalescingStrategy
+    static class TimeHorizonMovingAverageCoalescingStrategy extends 
AbstractCoalescingStrategy
     {
         // for now we'll just use 64ms per bucket; this can be made 
configurable, but results in ~1s for 16 samples
         private static final int INDEX_SHIFT = 26;
@@ -261,7 +209,7 @@ public class CoalescingStrategies
         private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * 
(BUCKET_COUNT - 1);
 
         // the minimum timestamp we will now accept updates for; only moves 
forwards, never backwards
-        private long epoch = CLOCK.nanoTime();
+        private long epoch;
         // the buckets, each following on from epoch; the measurements run 
from ix(epoch) to ix(epoch - 1)
         // ix(epoch-1) is a partial result, that is never actually part of the 
calculation, and most updates
         // are expected to hit this bucket
@@ -269,31 +217,12 @@ public class CoalescingStrategies
         private long sum = 0;
         private final long maxCoalesceWindow;
 
-        public TimeHorizonMovingAverageCoalescingStrategy(int 
maxCoalesceWindow, Parker parker, Logger logger, String displayName)
+        public TimeHorizonMovingAverageCoalescingStrategy(int 
maxCoalesceWindow, Logger logger, String displayName, long initialEpoch)
         {
-            super(parker, logger, displayName);
+            super(logger, displayName);
             this.maxCoalesceWindow = 
TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
             sum = 0;
-        }
-
-        private void logSample(long nanos)
-        {
-            debugTimestamp(nanos);
-            long epoch = this.epoch;
-            long delta = nanos - epoch;
-            if (delta < 0)
-                // have to simply ignore, but would be a bit crazy to get such 
reordering
-                return;
-
-            if (delta > INTERVAL)
-                epoch = rollepoch(delta, epoch, nanos);
-
-            int ix = ix(nanos);
-            samples[ix]++;
-
-            // if we've updated an old bucket, we need to update the sum to 
match
-            if (ix != ix(epoch - 1))
-                sum++;
+            epoch = initialEpoch;
         }
 
         private long averageGap()
@@ -304,7 +233,7 @@ public class CoalescingStrategies
         }
 
         // this sample extends past the end of the range we cover, so rollover
-        private long rollepoch(long delta, long epoch, long nanos)
+        private long rollEpoch(long delta, long epoch, long nanos)
         {
             if (delta > 2 * INTERVAL)
             {
@@ -341,30 +270,32 @@ public class CoalescingStrategies
             return (int) ((nanos >>> INDEX_SHIFT) & 15);
         }
 
-        @Override
-        protected <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws 
InterruptedException
+        public void newArrival(Coalescable message)
         {
-            if (input.drainTo(out, maxItems) == 0)
-            {
-                out.add(input.take());
-                input.drainTo(out, maxItems - out.size());
-            }
+            final long timestamp = message.timestampNanos();
+            debugTimestamp(timestamp);
+            long epoch = this.epoch;
+            long delta = timestamp - epoch;
+            if (delta < 0)
+                // have to simply ignore, but would be a bit unlucky to get 
such reordering
+                return;
 
-            for (Coalescable qm : out)
-                logSample(qm.timestampNanos());
+            if (delta > INTERVAL)
+                epoch = rollEpoch(delta, epoch, timestamp);
+
+            int ix = ix(timestamp);
+            samples[ix]++;
 
+            // if we've updated an old bucket, we need to update the sum to 
match
+            if (ix != ix(epoch - 1))
+                sum++;
+        }
+
+        public long currentCoalescingTimeNanos()
+        {
             long averageGap = averageGap();
             debugGap(averageGap);
-
-            int count = out.size();
-            if (maybeSleep(count, averageGap, maxCoalesceWindow, parker))
-            {
-                input.drainTo(out, maxItems - out.size());
-                int prevCount = count;
-                count = out.size();
-                for (int  i = prevCount; i < count; i++)
-                    logSample(out.get(i).timestampNanos());
-            }
+            return determineCoalescingTime(averageGap, maxCoalesceWindow);
         }
 
         @Override
@@ -374,25 +305,27 @@ public class CoalescingStrategies
         }
     }
 
-    /*
+    /**
      * Start coalescing by sleeping if the moving average is < the requested 
window.
      * The actual time spent waiting to coalesce will be the min( window, 
moving average * 2)
      * The actual amount of time spent waiting can be greater then the window. 
For instance
      * observed time spent coalescing was 400 microseconds with the window set 
to 200 in one benchmark.
      */
     @VisibleForTesting
-    static class MovingAverageCoalescingStrategy extends CoalescingStrategy
+    static class MovingAverageCoalescingStrategy extends 
AbstractCoalescingStrategy
     {
-        private final int samples[] = new int[16];
+        static final int SAMPLE_SIZE = 16;
+        private final int samples[] = new int[SAMPLE_SIZE];
+        private final long maxCoalesceWindow;
+
         private long lastSample = 0;
         private int index = 0;
         private long sum = 0;
+        private long currentGap;
 
-        private final long maxCoalesceWindow;
-
-        public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker 
parker, Logger logger, String displayName)
+        public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Logger 
logger, String displayName)
         {
-            super(parker, logger, displayName);
+            super(logger, displayName);
             this.maxCoalesceWindow = 
TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
             for (int ii = 0; ii < samples.length; ii++)
                 samples[ii] = Integer.MAX_VALUE;
@@ -406,42 +339,29 @@ public class CoalescingStrategies
             samples[index] = value;
             index++;
             index = index & ((1 << 4) - 1);
-            return sum / 16;
+            return sum / SAMPLE_SIZE;
         }
 
-        private long notifyOfSample(long sample)
+        public void newArrival(Coalescable message)
         {
-            debugTimestamp(sample);
-            if (sample > lastSample)
+            final long timestamp = message.timestampNanos();
+            debugTimestamp(timestamp);
+            if (timestamp > lastSample)
             {
-                final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - 
lastSample));
-                lastSample = sample;
-                return logSample(delta);
+                final int delta = (int)(Math.min(Integer.MAX_VALUE, timestamp 
- lastSample));
+                lastSample = timestamp;
+                currentGap = logSample(delta);
             }
             else
             {
-                return logSample(1);
+                currentGap = logSample(1);
             }
         }
 
-        @Override
-        protected <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws 
InterruptedException
+        public long currentCoalescingTimeNanos()
         {
-            if (input.drainTo(out, maxItems) == 0)
-            {
-                out.add(input.take());
-                input.drainTo(out, maxItems - out.size());
-            }
-
-            long average = notifyOfSample(out.get(0).timestampNanos());
-            debugGap(average);
-
-            if (maybeSleep(out.size(), average, maxCoalesceWindow, parker)) {
-                input.drainTo(out, maxItems - out.size());
-            }
-
-            for (int ii = 1; ii < out.size(); ii++)
-                notifyOfSample(out.get(ii).timestampNanos());
+            debugGap(currentGap);
+            return determineCoalescingTime(currentGap, maxCoalesceWindow);
         }
 
         @Override
@@ -451,35 +371,28 @@ public class CoalescingStrategies
         }
     }
 
-    /*
+    /**
      * A fixed strategy as a backup in case MovingAverage or 
TimeHorizongMovingAverage fails in some scenario
      */
     @VisibleForTesting
-    static class FixedCoalescingStrategy extends CoalescingStrategy
+    static class FixedCoalescingStrategy extends AbstractCoalescingStrategy
     {
         private final long coalesceWindow;
 
-        public FixedCoalescingStrategy(int coalesceWindowMicros, Parker 
parker, Logger logger, String displayName)
+        public FixedCoalescingStrategy(int coalesceWindowMicros, Logger 
logger, String displayName)
         {
-            super(parker, logger, displayName);
+            super(logger, displayName);
             coalesceWindow = 
TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros);
         }
 
-        @Override
-        protected <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws 
InterruptedException
+        public void newArrival(Coalescable message)
         {
-            int enough = 
DatabaseDescriptor.getOtcCoalescingEnoughCoalescedMessages();
+            debugTimestamp(message.timestampNanos());
+        }
 
-            if (input.drainTo(out, maxItems) == 0)
-            {
-                out.add(input.take());
-                input.drainTo(out, maxItems - out.size());
-                if (out.size() < enough) {
-                    parker.park(coalesceWindow);
-                    input.drainTo(out, maxItems - out.size());
-                }
-            }
-            debugTimestamps(out);
+        public long currentCoalescingTimeNanos()
+        {
+            return coalesceWindow;
         }
 
         @Override
@@ -489,84 +402,43 @@ public class CoalescingStrategies
         }
     }
 
-    /*
-     * A coalesscing strategy that just returns all currently available 
elements
-     */
-    @VisibleForTesting
-    static class DisabledCoalescingStrategy extends CoalescingStrategy
+    public static Optional<CoalescingStrategy> newCoalescingStrategy(String 
strategy, int coalesceWindow, Logger logger, String displayName)
     {
+        String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH);
 
-        public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker 
parker, Logger logger, String displayName)
-        {
-            super(parker, logger, displayName);
-        }
-
-        @Override
-        protected <C extends Coalescable> void 
coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws 
InterruptedException
+        try
         {
-            if (input.drainTo(out, maxItems) == 0)
+            switch (Enum.valueOf(Strategy.class, strategyCleaned))
             {
-                out.add(input.take());
-                input.drainTo(out, maxItems - 1);
+                case MOVINGAVERAGE:
+                    return Optional.of(new 
MovingAverageCoalescingStrategy(coalesceWindow, logger, displayName));
+                case FIXED:
+                    return Optional.of(new 
FixedCoalescingStrategy(coalesceWindow, logger, displayName));
+                case TIMEHORIZON:
+                    long initialEpoch = System.nanoTime();
+                    return Optional.of(new 
TimeHorizonMovingAverageCoalescingStrategy(coalesceWindow, logger, displayName, 
initialEpoch));
+                case DISABLED:
+                    return Optional.empty();
+                default:
+                    throw new IllegalArgumentException("supported coalese 
strategy");
             }
-            debugTimestamps(out);
         }
-
-        @Override
-        public String toString()
+        catch (IllegalArgumentException iae)
         {
-            return "Disabled";
-        }
-    }
-
-    @VisibleForTesting
-    static CoalescingStrategy newCoalescingStrategy(String strategy,
-                                                    int coalesceWindow,
-                                                    Parker parker,
-                                                    Logger logger,
-                                                    String displayName)
-    {
-        String classname = null;
-        String strategyCleaned = strategy.trim().toUpperCase(Locale.ENGLISH);
-        switch(strategyCleaned)
-        {
-        case "MOVINGAVERAGE":
-            classname = MovingAverageCoalescingStrategy.class.getName();
-            break;
-        case "FIXED":
-            classname = FixedCoalescingStrategy.class.getName();
-            break;
-        case "TIMEHORIZON":
-            classname = 
TimeHorizonMovingAverageCoalescingStrategy.class.getName();
-            break;
-        case "DISABLED":
-            classname = DisabledCoalescingStrategy.class.getName();
-            break;
-        default:
-            classname = strategy;
-        }
+            try
+            {
+                Class<?> clazz = Class.forName(strategy);
 
-        try
-        {
-            Class<?> clazz = Class.forName(classname);
+                if (!CoalescingStrategy.class.isAssignableFrom(clazz))
+                    throw new RuntimeException(strategy + " is not an instance 
of CoalescingStrategy");
 
-            if (!CoalescingStrategy.class.isAssignableFrom(clazz))
+                Constructor<?> constructor = clazz.getConstructor(int.class, 
Logger.class, String.class);
+                return 
Optional.of((CoalescingStrategy)constructor.newInstance(coalesceWindow, logger, 
displayName));
+            }
+            catch (Exception e)
             {
-                throw new RuntimeException(classname + " is not an instance of 
CoalescingStrategy");
+                throw new RuntimeException(e);
             }
-
-            Constructor<?> constructor = clazz.getConstructor(int.class, 
Parker.class, Logger.class, String.class);
-
-            return (CoalescingStrategy)constructor.newInstance(coalesceWindow, 
parker, logger, displayName);
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
         }
     }
-
-    public static CoalescingStrategy newCoalescingStrategy(String strategy, 
int coalesceWindow, Logger logger, String displayName)
-    {
-        return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, 
displayName);
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 3fa64b3..58c3371 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -147,6 +147,13 @@ public class FBUtilities
         return broadcastInetAddress;
     }
 
+    /**
+     * <b>THIS IS FOR TESTING ONLY!!</b>
+     */
+    public static void setBroadcastInetAddress(InetAddress addr)
+    {
+        broadcastInetAddress = addr;
+    }
 
     public static InetAddress getBroadcastRpcAddress()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/utils/NativeLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeLibrary.java 
b/src/java/org/apache/cassandra/utils/NativeLibrary.java
index 735d51a..7d54791 100644
--- a/src/java/org/apache/cassandra/utils/NativeLibrary.java
+++ b/src/java/org/apache/cassandra/utils/NativeLibrary.java
@@ -49,7 +49,7 @@ public final class NativeLibrary
         OTHER;
     }
 
-    private static final OSType osType;
+    public static final OSType osType;
 
     private static final int MCL_CURRENT;
     private static final int MCL_FUTURE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/conf/cassandra_ssl_test.keystore
----------------------------------------------------------------------
diff --git a/test/conf/cassandra_ssl_test.keystore 
b/test/conf/cassandra_ssl_test.keystore
new file mode 100644
index 0000000..8b2b218
Binary files /dev/null and b/test/conf/cassandra_ssl_test.keystore differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/conf/cassandra_ssl_test.truststore
----------------------------------------------------------------------
diff --git a/test/conf/cassandra_ssl_test.truststore 
b/test/conf/cassandra_ssl_test.truststore
new file mode 100644
index 0000000..49cf332
Binary files /dev/null and b/test/conf/cassandra_ssl_test.truststore differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 249d780..056089e 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.cassandra.db;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -46,6 +49,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableMetadata;
@@ -306,4 +310,33 @@ public class ReadCommandTest
             assertEquals(expectedRows.length, i);
         }
     }
+
+    public void serializerTest() throws IOException
+    {
+        ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
+
+        new RowUpdateBuilder(cfs.metadata.get(), 0, 
ByteBufferUtil.bytes("key"))
+        .clustering("dd")
+        .add("a", ByteBufferUtil.bytes("abcd"))
+        .build()
+        .apply();
+
+        ReadCommand readCommand = Util.cmd(cfs, 
Util.dk("key")).includeRow("dd").build();
+        int messagingVersion = MessagingService.current_version;
+        long size = ReadCommand.serializer.serializedSize(readCommand, 
messagingVersion);
+
+        FakeOutputStream out = new FakeOutputStream();
+        ReadCommand.serializer.serialize(readCommand, new 
WrappedDataOutputStreamPlus(out), messagingVersion);
+        Assert.assertEquals(size, out.count);
+    }
+
+    static class FakeOutputStream extends OutputStream
+    {
+        long count;
+
+        public void write(int b) throws IOException
+        {
+            count++;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java 
b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
index 5e99523..09973a8 100644
--- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
@@ -21,12 +21,10 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.EnumMap;
 import java.util.Map;
 
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -37,8 +35,6 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.OutboundTcpConnectionPool;
 import org.apache.cassandra.service.StorageService;
 
 import static org.junit.Assert.assertEquals;
@@ -102,22 +98,6 @@ public class EC2SnitchTest
         assertEquals("2d", snitch.getRack(local));
     }
 
-    @Test
-    public void testEc2MRSnitch() throws UnknownHostException
-    {
-        InetAddress me = InetAddress.getByName("127.0.0.2");
-        InetAddress com_ip = InetAddress.getByName("127.0.0.3");
-
-        OutboundTcpConnectionPool pool = 
MessagingService.instance().getConnectionPool(me);
-        Assert.assertEquals(me, pool.endPoint());
-        pool.reset(com_ip);
-        Assert.assertEquals(com_ip, pool.endPoint());
-
-        MessagingService.instance().destroyConnectionPool(me);
-        pool = MessagingService.instance().getConnectionPool(me);
-        Assert.assertEquals(com_ip, pool.endPoint());
-    }
-
     @AfterClass
     public static void tearDown()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java 
b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 7cb3cfd..a082d56 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -20,10 +20,9 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -31,23 +30,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.*;
 import java.util.regex.Matcher;
 
 import com.google.common.collect.Iterables;
+
 import com.codahale.metrics.Timer;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+import org.apache.cassandra.net.MessagingService.ServerChannel;
+import org.apache.cassandra.net.async.NettyFactory;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.net.async.OutboundConnectionParams;
+import org.apache.cassandra.net.async.OutboundMessagingPool;
+import org.apache.cassandra.utils.FBUtilities;
 import org.caffinitas.ohc.histo.EstimatedHistogram;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -93,6 +100,12 @@ public class MessagingServiceTest
         
messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3"));
     }
 
+    @After
+    public void replaceAuthenticator()
+    {
+        DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+    }
+
     @Test
     public void testDroppedMessages()
     {
@@ -197,7 +210,7 @@ public class MessagingServiceTest
     @Test
     public void 
testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         MessageOut<?> ignored = null;
@@ -218,7 +231,7 @@ public class MessagingServiceTest
     @Test
     public void 
testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         boolean timeout = false;
@@ -242,7 +255,7 @@ public class MessagingServiceTest
     @Test
     public void 
testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws 
UnknownHostException
     {
-        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState();
+        MockBackPressureStrategy.MockBackPressureState backPressureState = 
(MockBackPressureStrategy.MockBackPressureState) 
messagingService.getBackPressureState(InetAddress.getByName("127.0.0.2"));
         IAsyncCallback bpCallback = new BackPressureCallback();
         IAsyncCallback noCallback = new NoBackPressureCallback();
         boolean timeout = true;
@@ -285,13 +298,7 @@ public class MessagingServiceTest
 
     private static void addDCLatency(long sentAt, long nowTime) throws 
IOException
     {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
-        {
-            out.writeInt((int) sentAt);
-        }
-        DataInputStreamPlus in = new DataInputStreamPlus(new 
ByteArrayInputStream(baos.toByteArray()));
-        MessageIn.readConstructionTime(InetAddress.getLocalHost(), in, 
nowTime);
+        MessageIn.deriveConstructionTime(InetAddress.getLocalHost(), 
(int)sentAt, nowTime);
     }
 
     public static class MockBackPressureStrategy implements 
BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>
@@ -414,32 +421,83 @@ public class MessagingServiceTest
         InetAddress address = InetAddress.getByName("127.0.0.250");
 
         //Should return null
-        assertNull(ms.getConnectionPool(address));
-        assertNull(ms.getConnection(address, new 
MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK)));
+        MessageOut messageOut = new 
MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK);
+        assertFalse(ms.isConnected(address, messageOut));
 
         //Should tolerate null
         ms.convict(address);
-        ms.sendOneWay(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 
address);
+        ms.sendOneWay(messageOut, address);
     }
 
     @Test
-    public void testOutboundTcpConnectionCleansUp() throws Exception
+    public void testOutboundMessagingConnectionCleansUp() throws Exception
     {
         MessagingService ms = MessagingService.instance();
-        
DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
-        InetAddress address = InetAddress.getByName("127.0.0.250");
-        OutboundTcpConnectionPool pool = new 
OutboundTcpConnectionPool(address, new 
MockBackPressureStrategy(null).newState(address));
-        ms.connectionManagers.put(address, pool);
-        pool.smallMessages.start();
-        pool.smallMessages.enqueue(new 
MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0);
-        pool.smallMessages.join();
-        assertFalse(ms.connectionManagers.containsKey(address));
+        InetSocketAddress local = new InetSocketAddress("127.0.0.1", 9876);
+        InetSocketAddress remote = new InetSocketAddress("127.0.0.2", 9876);
+
+        OutboundMessagingPool pool = new OutboundMessagingPool(remote, local, 
null, new MockBackPressureStrategy(null).newState(remote.getAddress()), 
ALLOW_NOTHING_AUTHENTICATOR);
+        ms.channelManagers.put(remote.getAddress(), pool);
+        pool.sendMessage(new 
MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), 0);
+        assertFalse(ms.channelManagers.containsKey(remote.getAddress()));
     }
 
-    @After
-    public void replaceAuthenticator()
+    @Test
+    public void reconnectWithNewIp() throws UnknownHostException
     {
-        DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+        InetAddress publicIp = InetAddress.getByName("127.0.0.2");
+        InetAddress privateIp = InetAddress.getByName("127.0.0.3");
+
+        // reset the preferred IP value, for good test hygene
+        SystemKeyspace.updatePreferredIP(publicIp, publicIp);
+
+        // create pool/conn with public addr
+        Assert.assertEquals(publicIp, 
messagingService.getCurrentEndpoint(publicIp));
+        messagingService.reconnectWithNewIp(publicIp, privateIp);
+        Assert.assertEquals(privateIp, 
messagingService.getCurrentEndpoint(publicIp));
+
+        messagingService.destroyConnectionPool(publicIp);
+
+        // recreate the pool/conn, and make sure the preferred ip addr is used
+        Assert.assertEquals(privateIp, 
messagingService.getCurrentEndpoint(publicIp));
+    }
+
+    @Test
+    public void testCloseInboundConnections() throws UnknownHostException, 
InterruptedException
+    {
+        messagingService.listen();
+        Assert.assertTrue(messagingService.isListening());
+        Assert.assertTrue(messagingService.serverChannels.size() > 0);
+        for (ServerChannel serverChannel : messagingService.serverChannels)
+            Assert.assertEquals(0, serverChannel.size());
+
+        // now, create a connection and make sure it's in a channel group
+        InetSocketAddress server = new 
InetSocketAddress(FBUtilities.getBroadcastAddress(), 
DatabaseDescriptor.getStoragePort());
+        OutboundConnectionIdentifier id = 
OutboundConnectionIdentifier.small(new 
InetSocketAddress(InetAddress.getByName("127.0.0.2"), 0), server);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        OutboundConnectionParams params = OutboundConnectionParams.builder()
+                                                                  
.mode(NettyFactory.Mode.MESSAGING)
+                                                                  
.sendBufferSize(1 << 10)
+                                                                  
.connectionId(id)
+                                                                  
.callback(handshakeResult -> latch.countDown())
+                                                                  
.protocolVersion(MessagingService.current_version)
+                                                                  .build();
+        Bootstrap bootstrap = 
NettyFactory.instance.createOutboundBootstrap(params);
+        Channel channel = bootstrap.connect().awaitUninterruptibly().channel();
+        Assert.assertNotNull(channel);
+        latch.await(1, TimeUnit.SECONDS); // allow the netty pipeline/c* 
handshake to get set up
+
+        int connectCount = 0;
+        for (ServerChannel serverChannel : messagingService.serverChannels)
+            connectCount += serverChannel.size();
+        Assert.assertTrue(connectCount > 0);
+
+        // last, shutdown the MS and make sure connections are removed
+        messagingService.shutdown();
+        for (ServerChannel serverChannel : messagingService.serverChannels)
+            Assert.assertEquals(0, serverChannel.size());
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java 
b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
deleted file mode 100644
index e3b6817..0000000
--- a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.MessagingService.Verb;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The tests check whether Queue expiration in the OutboundTcpConnection 
behaves properly for droppable and
- * non-droppable messages.
- */
-public class OutboundTcpConnectionTest
-{
-    AtomicInteger messageId = new AtomicInteger(0);
-
-    final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout
-    final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not 
droppable
-    
-    final static long NANOS_FOR_TIMEOUT;
-
-    static
-    {
-        DatabaseDescriptor.daemonInitialization();
-        NANOS_FOR_TIMEOUT = 
TimeUnit.MILLISECONDS.toNanos(VERB_DROPPABLE.getTimeout()*2);
-    }
-    
-    /**
-     * Verifies our assumptions whether a Verb can be dropped or not. The 
tests make use of droppabilty, and
-     * may produce wrong test results if their droppabilty is changed. 
-     */
-    @BeforeClass
-    public static void assertDroppability()
-    {
-        if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE))
-            throw new AssertionError("Expected " + VERB_DROPPABLE + " to be 
droppable");
-        if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE))
-            throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not 
to be droppable");
-    }
-
-    /**
-     * Tests that non-droppable messages are never expired
-     */
-    @Test
-    public void testNondroppable() throws UnknownHostException
-    {
-        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
-        long nanoTimeBeforeEnqueue = System.nanoTime();
-
-        assertFalse("Fresh OutboundTcpConnection contains expired messages",
-                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
-
-        fillToPurgeSize(otc, VERB_NONDROPPABLE);
-        fillToPurgeSize(otc, VERB_NONDROPPABLE);
-        otc.expireMessages(expirationTimeNanos());
-
-        assertFalse("OutboundTcpConnection with non-droppable verbs should not 
expire",
-                otc.backlogContainsExpiredMessages(expirationTimeNanos()));
-    }
-
-    /**
-     * Tests that droppable messages will be dropped after they expire, but 
not before.
-     * 
-     * @throws UnknownHostException
-     */
-    @Test
-    public void testDroppable() throws UnknownHostException
-    {
-        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
-        long nanoTimeBeforeEnqueue = System.nanoTime();
-
-        initialFill(otc, VERB_DROPPABLE);
-        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire immediately",
-                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
-
-        otc.expireMessages(nanoTimeBeforeEnqueue);
-        assertFalse("OutboundTcpConnection with droppable verbs should not 
expire with enqueue-time expiration",
-                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
-
-        // Lets presume, expiration time have passed => At that time there 
shall be expired messages in the Queue
-        long nanoTimeWhenExpired = expirationTimeNanos();
-        assertTrue("OutboundTcpConnection with droppable verbs should have 
expired",
-                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
-
-        // Using the same timestamp, lets expire them and check whether they 
have gone
-        otc.expireMessages(nanoTimeWhenExpired);
-        assertFalse("OutboundTcpConnection should not have expired entries",
-                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
-
-        // Actually the previous test can be done in a harder way: As 
expireMessages() has run, we cannot have
-        // ANY expired values, thus lets test also against 
nanoTimeBeforeEnqueue
-        assertFalse("OutboundTcpConnection should not have any expired 
entries",
-                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
-
-    }
-
-    /**
-     * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), 
elements. The first
-     * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a 
message with the given Verb and can be
-     * droppable or non-droppable.
-     */
-    private void initialFill(OutboundTcpConnection otc, Verb verb)
-    {
-        assertFalse("Fresh OutboundTcpConnection contains expired messages",
-                otc.backlogContainsExpiredMessages(System.nanoTime()));
-
-        fillToPurgeSize(otc, VERB_NONDROPPABLE);
-        MessageOut<?> messageDroppable10s = new MessageOut<>(verb);
-        otc.enqueue(messageDroppable10s, nextMessageId());
-        otc.expireMessages(System.nanoTime());
-    }
-
-    /**
-     * Returns a nano timestamp in the far future, when expiration should have 
been performed for VERB_DROPPABLE.
-     * The offset is chosen as 2 times of the expiration time of 
VERB_DROPPABLE.
-     * 
-     * @return The future nano timestamp
-     */
-    private long expirationTimeNanos()
-    {
-        return System.nanoTime() + NANOS_FOR_TIMEOUT;
-    }
-
-    private int nextMessageId()
-    {
-        return messageId.incrementAndGet();
-    }
-
-    /**
-     * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At 
BACKLOG_PURGE_SIZE expiration starts to work.
-     * 
-     * @param otc
-     *            The OutboundTcpConnection
-     * @param verb
-     *            The verb that defines the message type
-     */
-    private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb)
-    {
-        for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++)
-        {
-            otc.enqueue(new MessageOut<>(verb), nextMessageId());
-        }
-    }
-
-    private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() 
throws UnknownHostException
-    {
-        InetAddress lo = InetAddress.getByName("127.0.0.1");
-        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo, 
null);
-        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool, 
"lo-OutboundTcpConnectionTest");
-        return otc;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java 
b/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java
new file mode 100644
index 0000000..959c37a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/async/ByteBufDataOutputPlusTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.SafeMemory;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class ByteBufDataOutputPlusTest
+{
+    private static final String KEYSPACE1 = "NettyPipilineTest";
+    private static final String STANDARD1 = "Standard1";
+    private static final int columnCount = 128;
+
+    private ByteBuf buf;
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, 
STANDARD1, columnCount, AsciiType.instance, BytesType.instance));
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (buf != null)
+            buf.release();
+    }
+
+    @Test
+    public void compareBufferSizes() throws IOException
+    {
+        final int currentFrameSize = 
getMessage().message.serializedSize(MessagingService.current_version);
+
+        ByteBuffer buffer = ByteBuffer.allocateDirect(currentFrameSize); 
//bufferedOut.nioBuffer(0, bufferedOut.writableBytes());
+        getMessage().message.serialize(new DataOutputBuffer(buffer), 
MessagingService.current_version);
+        Assert.assertFalse(buffer.hasRemaining());
+        Assert.assertEquals(buffer.capacity(), buffer.position());
+
+        ByteBuf bbosOut = 
PooledByteBufAllocator.DEFAULT.ioBuffer(currentFrameSize, currentFrameSize);
+        try
+        {
+            getMessage().message.serialize(new ByteBufDataOutputPlus(bbosOut), 
MessagingService.current_version);
+
+            Assert.assertFalse(bbosOut.isWritable());
+            Assert.assertEquals(bbosOut.capacity(), bbosOut.writerIndex());
+
+            Assert.assertEquals(buffer.position(), bbosOut.writerIndex());
+            for (int i = 0; i < currentFrameSize; i++)
+            {
+                Assert.assertEquals(buffer.get(i), bbosOut.getByte(i));
+            }
+        }
+        finally
+        {
+            bbosOut.release();
+        }
+    }
+
+    private QueuedMessage getMessage()
+    {
+        ColumnFamilyStore cfs1 = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        ByteBuffer buf = ByteBuffer.allocate(1 << 10);
+        RowUpdateBuilder rowUpdateBuilder = new 
RowUpdateBuilder(cfs1.metadata.get(), 0, "k")
+                                            .clustering("bytes");
+        for (int i = 0; i < columnCount; i++)
+            rowUpdateBuilder.add("val" + i, buf);
+
+        Mutation mutation = rowUpdateBuilder.build();
+        return new QueuedMessage(mutation.createMessage(), 42);
+    }
+
+    @Test
+    public void compareDOS() throws IOException
+    {
+        buf = PooledByteBufAllocator.DEFAULT.ioBuffer(1024, 1024);
+        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
+
+        ByteBufDataOutputPlus byteBufDataOutputPlus = new 
ByteBufDataOutputPlus(buf);
+        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(buffer);
+
+        write(byteBufDataOutputPlus);
+        write(dataOutputBuffer);
+
+        Assert.assertEquals(buffer.position(), buf.writerIndex());
+        for (int i = 0; i < buffer.position(); i++)
+        {
+            Assert.assertEquals(buffer.get(i), buf.getByte(i));
+        }
+    }
+
+    private void write(DataOutputPlus out) throws IOException
+    {
+        ByteBuffer b = ByteBuffer.allocate(8);
+        b.putLong(29811134237462734L);
+        out.write(b);
+        b = ByteBuffer.allocateDirect(8);
+        b.putDouble(92367.4253647890626);
+        out.write(b);
+
+        out.writeInt(29319236);
+
+        byte[] array = new byte[17];
+        for (int i = 0; i < array.length; i++)
+            array[i] = (byte)i;
+        out.write(array, 0 , array.length);
+
+        out.write(42);
+        out.writeUTF("This is a great string!!");
+        out.writeByte(-100);
+        out.writeUnsignedVInt(3247634L);
+        out.writeVInt(12313695L);
+        out.writeBoolean(true);
+        out.writeShort(4371);
+        out.writeChar('j');
+        out.writeLong(472348263487234L);
+        out.writeFloat(34534.12623F);
+        out.writeDouble(0.2384253D);
+        out.writeBytes("Write my bytes");
+        out.writeChars("These are some swell chars");
+
+        Memory memory = new SafeMemory(8);
+        memory.setLong(0, -21365123651231L);
+        out.write(memory, 0, memory.size());
+        memory.close();
+    }
+
+    @Test (expected = UnsupportedOperationException.class)
+    public void applyToChannel() throws IOException
+    {
+        ByteBufDataOutputPlus out = new 
ByteBufDataOutputPlus(Unpooled.wrappedBuffer(new byte[0]));
+        out.applyToChannel(null);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to