http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 41771e7..6caada1 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -17,45 +17,63 @@
  */
 package org.apache.cassandra.net;
 
-import java.io.*;
+import java.io.IOError;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import javax.net.ssl.SSLHandshakeException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.carrotsearch.hppc.IntObjectMap;
 import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.concurrent.ExecutorLocals;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.LocalAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.batchlog.Batch;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.SnapshotCommand;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.TruncateResponse;
+import org.apache.cassandra.db.Truncation;
+import org.apache.cassandra.db.WriteResponse;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.IPartitioner;
@@ -70,22 +88,32 @@ import org.apache.cassandra.hints.HintResponse;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
 import org.apache.cassandra.metrics.MessagingMetrics;
+import org.apache.cassandra.net.async.OutboundMessagingPool;
+import org.apache.cassandra.net.async.NettyFactory;
+import org.apache.cassandra.net.async.NettyFactory.InboundInitializer;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PrepareResponse;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.BooleanSerializer;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.NativeLibrary;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.StatusLogger;
+import org.apache.cassandra.utils.UUIDSerializer;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 public final class MessagingService implements MessagingServiceMBean
@@ -323,7 +351,7 @@ public final class MessagingService implements 
MessagingServiceMBean
     }};
 
     /**
-     * Messages we receive in IncomingTcpConnection have a Verb that tells us 
what kind of message it is.
+     * Messages we receive from peers have a Verb that tells us what kind of 
message it is.
      * Most of the time, this is enough to determine how to deserialize the 
message payload.
      * The exception is the REQUEST_RESPONSE verb, which just means "a reply 
to something you told me to do."
      * Traditionally, this was fine since each VerbHandler knew what type of 
payload it expected, and
@@ -418,12 +446,12 @@ public final class MessagingService implements 
MessagingServiceMBean
     private final Map<Verb, IVerbHandler> verbHandlers;
 
     @VisibleForTesting
-    final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> 
connectionManagers = new NonBlockingHashMap<>();
+    public final ConcurrentMap<InetAddress, OutboundMessagingPool> 
channelManagers = new NonBlockingHashMap<>();
+    final List<ServerChannel> serverChannels = Lists.newArrayList();
 
     private static final Logger logger = 
LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 
-    private final List<SocketThread> socketThreads = Lists.newArrayList();
     private final SimpleCondition listenGate;
 
     /**
@@ -533,9 +561,7 @@ public final class MessagingService implements 
MessagingServiceMBean
                 maybeAddLatency(expiredCallbackInfo.callback, 
expiredCallbackInfo.target, pair.right.timeout);
 
                 ConnectionMetrics.totalTimeouts.mark();
-                OutboundTcpConnectionPool cp = 
getConnectionPool(expiredCallbackInfo.target);
-                if (cp != null)
-                    cp.incrementTimeout();
+                markTimeout(expiredCallbackInfo.target);
 
                 if (expiredCallbackInfo.callback.supportsBackPressure())
                 {
@@ -606,12 +632,9 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
-            OutboundTcpConnectionPool cp = getConnectionPool(host);
-            if (cp != null)
-            {
-                BackPressureState backPressureState = 
cp.getBackPressureState();
+            BackPressureState backPressureState = getBackPressureState(host);
+            if (backPressureState != null)
                 backPressureState.onMessageSent(message);
-            }
         }
     }
 
@@ -626,15 +649,13 @@ public final class MessagingService implements 
MessagingServiceMBean
     {
         if (DatabaseDescriptor.backPressureEnabled() && 
callback.supportsBackPressure())
         {
-            OutboundTcpConnectionPool cp = getConnectionPool(host);
-            if (cp != null)
-            {
-                BackPressureState backPressureState = 
cp.getBackPressureState();
-                if (!timeout)
-                    backPressureState.onResponseReceived();
-                else
-                    backPressureState.onResponseTimeout();
-            }
+            BackPressureState backPressureState = getBackPressureState(host);
+            if (backPressureState == null)
+                return;
+            if (!timeout)
+                backPressureState.onResponseReceived();
+            else
+                backPressureState.onResponseTimeout();
         }
     }
 
@@ -656,14 +677,27 @@ public final class MessagingService implements 
MessagingServiceMBean
             {
                 if (host.equals(FBUtilities.getBroadcastAddress()))
                     continue;
-                OutboundTcpConnectionPool cp = getConnectionPool(host);
-                if (cp != null)
-                    states.add(cp.getBackPressureState());
+                OutboundMessagingPool pool = getMessagingConnection(host);
+                if (pool != null)
+                    states.add(pool.getBackPressureState());
             }
             backPressure.apply(states, timeoutInNanos, TimeUnit.NANOSECONDS);
         }
     }
 
+    BackPressureState getBackPressureState(InetAddress host)
+    {
+        OutboundMessagingPool messagingConnection = 
getMessagingConnection(host);
+        return messagingConnection != null ? 
messagingConnection.getBackPressureState() : null;
+    }
+
+    void markTimeout(InetAddress addr)
+    {
+        OutboundMessagingPool conn = channelManagers.get(addr);
+        if (conn != null)
+            conn.incrementTimeout();
+    }
+
     /**
      * Track latency information for the dynamic snitch
      *
@@ -688,30 +722,25 @@ public final class MessagingService implements 
MessagingServiceMBean
      */
     public void convict(InetAddress ep)
     {
-        OutboundTcpConnectionPool cp = getConnectionPool(ep);
-        if (cp != null)
-        {
-            logger.trace("Resetting pool for {}", ep);
-            cp.reset();
-        }
-        else
-        {
-            logger.debug("Not resetting pool for {} because internode 
authenticator said not to connect", ep);
-        }
+        logger.trace("Resetting pool for {}", ep);
+        reset(ep);
     }
 
     public void listen()
     {
         callbacks.reset(); // hack to allow tests to stop/restart MS
         listen(FBUtilities.getLocalAddress());
-        if (DatabaseDescriptor.shouldListenOnBroadcastAddress()
-            && 
!FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress()))
-        {
+        if (shouldListenOnBroadcastAddress())
             listen(FBUtilities.getBroadcastAddress());
-        }
         listenGate.signalAll();
     }
 
+    public static boolean shouldListenOnBroadcastAddress()
+    {
+        return DatabaseDescriptor.shouldListenOnBroadcastAddress()
+               && 
!FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress());
+    }
+
     /**
      * Listen on the specified port.
      *
@@ -719,80 +748,64 @@ public final class MessagingService implements 
MessagingServiceMBean
      */
     private void listen(InetAddress localEp) throws ConfigurationException
     {
-        for (ServerSocket ss : getServerSockets(localEp))
+        IInternodeAuthenticator authenticator = 
DatabaseDescriptor.getInternodeAuthenticator();
+        int receiveBufferSize = 
DatabaseDescriptor.getInternodeRecvBufferSize();
+
+        if 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != 
ServerEncryptionOptions.InternodeEncryption.none)
         {
-            SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
-            th.start();
-            socketThreads.add(th);
+            InetSocketAddress localAddr = new InetSocketAddress(localEp, 
DatabaseDescriptor.getSSLStoragePort());
+            ChannelGroup channelGroup = new 
DefaultChannelGroup("EncryptedInternodeMessagingGroup", 
NettyFactory.executorForChannelGroups());
+            InboundInitializer initializer = new 
InboundInitializer(authenticator, 
DatabaseDescriptor.getServerEncryptionOptions(), channelGroup);
+            Channel encryptedChannel = 
NettyFactory.instance.createInboundChannel(localAddr, initializer, 
receiveBufferSize);
+            serverChannels.add(new ServerChannel(encryptedChannel, 
channelGroup));
         }
+
+        if 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != 
ServerEncryptionOptions.InternodeEncryption.all)
+        {
+            InetSocketAddress localAddr = new InetSocketAddress(localEp, 
DatabaseDescriptor.getStoragePort());
+            ChannelGroup channelGroup = new 
DefaultChannelGroup("InternodeMessagingGroup", 
NettyFactory.executorForChannelGroups());
+            InboundInitializer initializer = new 
InboundInitializer(authenticator, null, channelGroup);
+            Channel channel = 
NettyFactory.instance.createInboundChannel(localAddr, initializer, 
receiveBufferSize);
+            serverChannels.add(new ServerChannel(channel, channelGroup));
+        }
+
+        if (serverChannels.isEmpty())
+            throw new IllegalStateException("no listening channels set up in 
MessagingService!");
     }
 
-    @SuppressWarnings("resource")
-    private List<ServerSocket> getServerSockets(InetAddress localEp) throws 
ConfigurationException
+    /**
+     * A simple struct to wrap up the the components needed for each listening 
socket.
+     */
+    @VisibleForTesting
+    static class ServerChannel
     {
-        final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
-        if 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != 
ServerEncryptionOptions.InternodeEncryption.none)
+        /**
+         * The base {@link Channel} that is doing the spcket listen/accept.
+         */
+        private final Channel channel;
+
+        /**
+         * A group of the open, inbound {@link Channel}s connected to this 
node. This is mostly interesting so that all of
+         * the inbound connections/channels can be closed when the listening 
socket itself is being closed.
+         */
+        private final ChannelGroup connectedChannels;
+
+        private ServerChannel(Channel channel, ChannelGroup channelGroup)
         {
-            try
-            {
-                
ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(),
 localEp, DatabaseDescriptor.getSSLStoragePort()));
-            }
-            catch (IOException e)
-            {
-                throw new ConfigurationException("Unable to create ssl 
socket", e);
-            }
-            // setReuseAddress happens in the factory.
-            logger.info("Starting Encrypted Messaging Service on SSL port {}", 
DatabaseDescriptor.getSSLStoragePort());
+            this.channel = channel;
+            this.connectedChannels = channelGroup;
         }
 
-        if 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != 
ServerEncryptionOptions.InternodeEncryption.all)
+        void close()
         {
-            ServerSocketChannel serverChannel = null;
-            try
-            {
-                serverChannel = ServerSocketChannel.open();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            ServerSocket socket = serverChannel.socket();
-            try
-            {
-                socket.setReuseAddress(true);
-            }
-            catch (SocketException e)
-            {
-                FileUtils.closeQuietly(socket);
-                throw new ConfigurationException("Insufficient permissions to 
setReuseAddress", e);
-            }
-            InetSocketAddress address = new InetSocketAddress(localEp, 
DatabaseDescriptor.getStoragePort());
-            try
-            {
-                socket.bind(address,500);
-            }
-            catch (BindException e)
-            {
-                FileUtils.closeQuietly(socket);
-                if (e.getMessage().contains("in use"))
-                    throw new ConfigurationException(address + " is in use by 
another process.  Change listen_address:storage_port in cassandra.yaml to 
values that do not conflict with other services");
-                else if (e.getMessage().contains("Cannot assign requested 
address"))
-                    throw new ConfigurationException("Unable to bind to 
address " + address
-                                                     + ". Set listen_address 
in cassandra.yaml to an interface you can bind to, e.g., your private IP 
address on EC2");
-                else
-                    throw new RuntimeException(e);
-            }
-            catch (IOException e)
-            {
-                FileUtils.closeQuietly(socket);
-                throw new RuntimeException(e);
-            }
-            String nic = FBUtilities.getNetworkInterface(localEp);
-            logger.info("Starting Messaging Service on {}:{}{}", localEp, 
DatabaseDescriptor.getStoragePort(),
-                        nic == null? "" : String.format(" (%s)", nic));
-            ss.add(socket);
+            channel.close().syncUninterruptibly();
+            connectedChannels.close().syncUninterruptibly();
+        }
+        int size()
+
+        {
+            return connectedChannels.size();
         }
-        return ss;
     }
 
     public void waitUntilListening()
@@ -812,53 +825,42 @@ public final class MessagingService implements 
MessagingServiceMBean
         return listenGate.isSignaled();
     }
 
+
     public void destroyConnectionPool(InetAddress to)
     {
-        OutboundTcpConnectionPool cp = connectionManagers.get(to);
-        if (cp == null)
-            return;
-        cp.close();
-        connectionManagers.remove(to);
+        OutboundMessagingPool pool = channelManagers.remove(to);
+        if (pool != null)
+            pool.close(true);
     }
 
     /**
-     * Get a connection pool to the specified endpoint. Constructs one if none 
exists.
+     * Reconnect to the peer using the given {@code addr}. Outstanding 
messages in each channel will be sent on the
+     * current channel. Typically this function is used for something like EC2 
public IP addresses which need to be used
+     * for communication between EC2 regions.
      *
-     * Can return null if the InternodeAuthenticator fails to authenticate the 
node.
-     * @param to
-     * @return The connection pool or null if internode authenticator says not 
to
+     * @param address IP Address to identify the peer
+     * @param preferredAddress IP Address to use (and prefer) going forward 
for connecting to the peer
      */
-    public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
+    public void reconnectWithNewIp(InetAddress address, InetAddress 
preferredAddress)
     {
-        OutboundTcpConnectionPool cp = connectionManagers.get(to);
-        if (cp == null)
-        {
-            //Don't attempt to connect to nodes that won't (or shouldn't) 
authenticate anyways
-            if 
(!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, 
OutboundTcpConnectionPool.portFor(to)))
-                return null;
+        SystemKeyspace.updatePreferredIP(address, preferredAddress);
 
-            cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
-            OutboundTcpConnectionPool existingPool = 
connectionManagers.putIfAbsent(to, cp);
-            if (existingPool != null)
-                cp = existingPool;
-            else
-                cp.start();
-        }
-        cp.waitForStarted();
-        return cp;
+        OutboundMessagingPool messagingPool = channelManagers.get(address);
+        if (messagingPool != null)
+            messagingPool.reconnectWithNewIp(new 
InetSocketAddress(preferredAddress, portFor(address)));
     }
 
-    /**
-     * Get a connection for a message to a specific endpoint. Constructs one 
if none exists.
-     *
-     * Can return null if the InternodeAuthenticator fails to authenticate the 
node.
-     * @param to
-     * @return The connection or null if internode authenticator says not to
-     */
-    public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
+    private void reset(InetAddress address)
     {
-        OutboundTcpConnectionPool cp = getConnectionPool(to);
-        return cp == null ? null : cp.getConnection(msg);
+        OutboundMessagingPool messagingPool = channelManagers.remove(address);
+        if (messagingPool != null)
+            messagingPool.close(false);
+    }
+
+    public InetAddress getCurrentEndpoint(InetAddress publicAddress)
+    {
+        OutboundMessagingPool messagingPool = 
getMessagingConnection(publicAddress);
+        return messagingPool != null ? 
messagingPool.getPreferredRemoteAddr().getAddress() : null;
     }
 
     /**
@@ -1008,12 +1010,9 @@ public final class MessagingService implements 
MessagingServiceMBean
             if (!ms.allowOutgoingMessage(message, id, to))
                 return;
 
-        // get pooled connection (really, connection queue)
-        OutboundTcpConnection connection = getConnection(to, message);
-
-        // write it
-        if (connection != null)
-            connection.enqueue(message, id);
+        OutboundMessagingPool outboundMessagingPool = 
getMessagingConnection(to);
+        if (outboundMessagingPool != null)
+            outboundMessagingPool.sendMessage(message, id);
     }
 
     public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)
@@ -1049,18 +1048,17 @@ public final class MessagingService implements 
MessagingServiceMBean
         // attempt to humor tests that try to stop and restart MS
         try
         {
-            for (SocketThread th : socketThreads)
-                try
-                {
-                    th.close();
-                }
-                catch (IOException e)
-                {
-                    // see 
https://issues.apache.org/jira/browse/CASSANDRA-10545
-                    handleIOExceptionOnClose(e);
-                }
+            // first close the recieve channels
+            for (ServerChannel serverChannel : serverChannels)
+                serverChannel.close();
+
+            // now close the send channels
+            for (OutboundMessagingPool pool : channelManagers.values())
+                pool.close(false);
+
+            NettyFactory.instance.close();
         }
-        catch (IOException e)
+        catch (Exception e)
         {
             throw new IOError(e);
         }
@@ -1281,109 +1279,13 @@ public final class MessagingService implements 
MessagingServiceMBean
         return ret;
     }
 
-    @VisibleForTesting
-    public static class SocketThread extends Thread
-    {
-        private final ServerSocket server;
-        @VisibleForTesting
-        public final Set<Closeable> connections = Sets.newConcurrentHashSet();
-
-        SocketThread(ServerSocket server, String name)
-        {
-            super(name);
-            this.server = server;
-        }
-
-        @SuppressWarnings("resource")
-        public void run()
-        {
-            while (!server.isClosed())
-            {
-                Socket socket = null;
-                try
-                {
-                    socket = server.accept();
-                    if (!authenticate(socket))
-                    {
-                        logger.trace("remote failed to authenticate");
-                        socket.close();
-                        continue;
-                    }
-
-                    socket.setKeepAlive(true);
-                    socket.setSoTimeout(2 * 
OutboundTcpConnection.WAIT_FOR_VERSION_MAX_TIME);
-                    // determine the connection type to decide whether to 
buffer
-                    DataInputStream in = new 
DataInputStream(socket.getInputStream());
-                    MessagingService.validateMagic(in.readInt());
-                    int header = in.readInt();
-                    boolean isStream = MessagingService.getBits(header, 3, 1) 
== 1;
-                    int version = MessagingService.getBits(header, 15, 8);
-                    logger.trace("Connection version {} from {}", version, 
socket.getInetAddress());
-                    socket.setSoTimeout(0);
-
-                    Thread thread = isStream
-                                  ? new IncomingStreamingConnection(version, 
socket, connections)
-                                  : new IncomingTcpConnection(version, 
MessagingService.getBits(header, 2, 1) == 1, socket, connections);
-                    thread.start();
-                    connections.add((Closeable) thread);
-                }
-                catch (AsynchronousCloseException e)
-                {
-                    // this happens when another thread calls close().
-                    logger.trace("Asynchronous close seen by server thread");
-                    break;
-                }
-                catch (ClosedChannelException e)
-                {
-                    logger.trace("MessagingService server thread already 
closed");
-                    break;
-                }
-                catch (SSLHandshakeException e)
-                {
-                    logger.error("SSL handshake error for inbound connection 
from " + socket, e);
-                    FileUtils.closeQuietly(socket);
-                }
-                catch (Throwable t)
-                {
-                    logger.trace("Error reading the socket {}", socket, t);
-                    FileUtils.closeQuietly(socket);
-                }
-            }
-            logger.info("MessagingService has terminated the accept() thread");
-        }
-
-        void close() throws IOException
-        {
-            logger.trace("Closing accept() thread");
-
-            try
-            {
-                server.close();
-            }
-            catch (IOException e)
-            {
-                // see https://issues.apache.org/jira/browse/CASSANDRA-8220
-                // see https://issues.apache.org/jira/browse/CASSANDRA-12513
-                handleIOExceptionOnClose(e);
-            }
-            for (Closeable connection : connections)
-            {
-                connection.close();
-            }
-        }
-
-        private boolean authenticate(Socket socket)
-        {
-            return 
DatabaseDescriptor.getInternodeAuthenticator().authenticate(socket.getInetAddress(),
 socket.getPort());
-        }
-    }
 
     private static void handleIOExceptionOnClose(IOException e) throws 
IOException
     {
         // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20
         // see https://bugs.openjdk.java.net/browse/JDK-8050499;
         // also CASSANDRA-12513
-        if ("Mac OS X".equals(System.getProperty("os.name")))
+        if (NativeLibrary.osType == NativeLibrary.OSType.MAC)
         {
             switch (e.getMessage())
             {
@@ -1398,79 +1300,73 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public Map<String, Integer> getLargeMessagePendingTasks()
     {
-        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getPendingMessages());
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getPendingMessages());
         return pendingTasks;
     }
 
-    public int getLargeMessagePendingTasks(InetAddress address)
-    {
-        OutboundTcpConnectionPool connection = connectionManagers.get(address);
-        return connection == null ? 0 : 
connection.largeMessages.getPendingMessages();
-    }
-
     public Map<String, Long> getLargeMessageCompletedTasks()
     {
-        Map<String, Long> completedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getCompletedMesssages());
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getLargeMessageDroppedTasks()
     {
-        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessages.getDroppedMessages());
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().largeMessageChannel.getDroppedMessages());
         return droppedTasks;
     }
 
     public Map<String, Integer> getSmallMessagePendingTasks()
     {
-        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getPendingMessages());
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getPendingMessages());
         return pendingTasks;
     }
 
     public Map<String, Long> getSmallMessageCompletedTasks()
     {
-        Map<String, Long> completedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getCompletedMesssages());
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getSmallMessageDroppedTasks()
     {
-        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessages.getDroppedMessages());
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().smallMessageChannel.getDroppedMessages());
         return droppedTasks;
     }
 
     public Map<String, Integer> getGossipMessagePendingTasks()
     {
-        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipMessages.getPendingMessages());
+        Map<String, Integer> pendingTasks = new HashMap<String, 
Integer>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getPendingMessages());
         return pendingTasks;
     }
 
     public Map<String, Long> getGossipMessageCompletedTasks()
     {
-        Map<String, Long> completedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipMessages.getCompletedMesssages());
+        Map<String, Long> completedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getCompletedMessages());
         return completedTasks;
     }
 
     public Map<String, Long> getGossipMessageDroppedTasks()
     {
-        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
-            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipMessages.getDroppedMessages());
+        Map<String, Long> droppedTasks = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), 
entry.getValue().gossipChannel.getDroppedMessages());
         return droppedTasks;
     }
 
@@ -1490,8 +1386,8 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public Map<String, Long> getTimeoutsPerHost()
     {
-        Map<String, Long> result = new HashMap<String, 
Long>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: 
connectionManagers.entrySet())
+        Map<String, Long> result = new HashMap<String, 
Long>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
         {
             String ip = entry.getKey().getHostAddress();
             long recent = entry.getValue().getTimeouts();
@@ -1502,8 +1398,8 @@ public final class MessagingService implements 
MessagingServiceMBean
 
     public Map<String, Double> getBackPressurePerHost()
     {
-        Map<String, Double> map = new HashMap<>(connectionManagers.size());
-        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : 
connectionManagers.entrySet())
+        Map<String, Double> map = new HashMap<>(channelManagers.size());
+        for (Map.Entry<InetAddress, OutboundMessagingPool> entry : 
channelManagers.entrySet())
             map.put(entry.getKey().getHostAddress(), 
entry.getValue().getBackPressureState().getBackPressureRateLimit());
 
         return map;
@@ -1540,9 +1436,72 @@ public final class MessagingService implements 
MessagingServiceMBean
                                                    
bounds.left.getPartitioner().getClass().getName()));
     }
 
+    private OutboundMessagingPool getMessagingConnection(InetAddress to)
+    {
+        OutboundMessagingPool pool = channelManagers.get(to);
+        if (pool == null)
+        {
+            final boolean secure = isEncryptedConnection(to);
+            final int port = portFor(secure);
+            if 
(!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, port))
+                return null;
+
+            InetSocketAddress preferredRemote = new 
InetSocketAddress(SystemKeyspace.getPreferredIP(to), port);
+            InetSocketAddress local = new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0);
+            ServerEncryptionOptions encryptionOptions = secure ? 
DatabaseDescriptor.getServerEncryptionOptions() : null;
+            IInternodeAuthenticator authenticator = 
DatabaseDescriptor.getInternodeAuthenticator();
+
+            pool = new OutboundMessagingPool(preferredRemote, local, 
encryptionOptions, backPressure.newState(to), authenticator);
+            OutboundMessagingPool existing = channelManagers.putIfAbsent(to, 
pool);
+            if (existing != null)
+            {
+                pool.close(false);
+                pool = existing;
+            }
+        }
+        return pool;
+    }
+
+    public static int portFor(InetAddress addr)
+    {
+        final boolean secure = isEncryptedConnection(addr);
+        return portFor(secure);
+    }
+
+    private static int portFor(boolean secure)
+    {
+        return secure ? DatabaseDescriptor.getSSLStoragePort() : 
DatabaseDescriptor.getStoragePort();
+    }
+
     @VisibleForTesting
-    public List<SocketThread> getSocketThreads()
+    boolean isConnected(InetAddress address, MessageOut messageOut)
     {
-        return socketThreads;
+        OutboundMessagingPool pool = channelManagers.get(address);
+        if (pool == null)
+            return false;
+        return pool.getConnection(messageOut).isConnected();
+    }
+
+    public static boolean isEncryptedConnection(InetAddress address)
+    {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        switch 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption)
+        {
+            case none:
+                return false; // if nothing needs to be encrypted then return 
immediately.
+            case all:
+                break;
+            case dc:
+                if 
(snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                    return false;
+                break;
+            case rack:
+                // for rack then check if the DC's are the same.
+                if 
(snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
+                    && 
snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
+                    return false;
+                break;
+        }
+        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
deleted file mode 100644
index 42abbe6..0000000
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ /dev/null
@@ -1,693 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.Checksum;
-
-import javax.net.ssl.SSLHandshakeException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.util.concurrent.FastThreadLocalThread;
-import net.jpountz.lz4.LZ4BlockOutputStream;
-import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
-import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.CoalescingStrategies;
-import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
-import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Uninterruptibles;
-
-public class OutboundTcpConnection extends FastThreadLocalThread
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(OutboundTcpConnection.class);
-
-    private static final String PREFIX = Config.PROPERTY_PREFIX;
-
-    /*
-     * Enabled/disable TCP_NODELAY for intradc connections. Defaults to 
enabled.
-     */
-    private static final String INTRADC_TCP_NODELAY_PROPERTY = PREFIX + 
"otc_intradc_tcp_nodelay";
-    private static final boolean INTRADC_TCP_NODELAY = 
Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
-
-    /*
-     * Size of buffer in output stream
-     */
-    private static final String BUFFER_SIZE_PROPERTY = PREFIX + 
"otc_buffer_size";
-    private static final int BUFFER_SIZE = 
Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
-
-    //Size of 3 elements added to every message
-    private static final int PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE = 12;
-
-    public static final int MAX_COALESCED_MESSAGES = 128;
-
-    private static CoalescingStrategy newCoalescingStrategy(String displayName)
-    {
-        return 
CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
-                                                          
DatabaseDescriptor.getOtcCoalescingWindow(),
-                                                          logger,
-                                                          displayName);
-    }
-
-    static
-    {
-        String strategy = DatabaseDescriptor.getOtcCoalescingStrategy();
-        switch (strategy)
-        {
-        case "TIMEHORIZON":
-            break;
-        case "MOVINGAVERAGE":
-        case "FIXED":
-        case "DISABLED":
-            logger.info("OutboundTcpConnection using coalescing strategy {}", 
strategy);
-            break;
-            default:
-                //Check that it can be loaded
-                newCoalescingStrategy("dummy");
-        }
-
-        int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow();
-        if (coalescingWindow != Config.otc_coalescing_window_us_default)
-            logger.info("OutboundTcpConnection coalescing window set to 
{}μs", coalescingWindow);
-
-        if (coalescingWindow < 0)
-            throw new ExceptionInInitializerError(
-                    "Value provided for coalescing window must be greater than 
0: " + coalescingWindow);
-
-        int otc_backlog_expiration_interval_in_ms = 
DatabaseDescriptor.getOtcBacklogExpirationInterval();
-        if (otc_backlog_expiration_interval_in_ms != 
Config.otc_backlog_expiration_interval_ms_default)
-            logger.info("OutboundTcpConnection backlog expiration interval set 
to to {}ms", otc_backlog_expiration_interval_in_ms);
-    }
-
-    private static final MessageOut<?> CLOSE_SENTINEL = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
-    private volatile boolean isStopped = false;
-
-    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-    public static final int WAIT_FOR_VERSION_MAX_TIME = 5000;
-    private static final int NO_VERSION = Integer.MIN_VALUE;
-
-    static final int LZ4_HASH_SEED = 0x9747b28c;
-
-    private final BlockingQueue<QueuedMessage> backlog = new 
LinkedBlockingQueue<>();
-    private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + 
"otc_backlog_purge_size";
-    @VisibleForTesting
-    static final int BACKLOG_PURGE_SIZE = 
Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY, 1024);
-    private final AtomicBoolean backlogExpirationActive = new 
AtomicBoolean(false);
-    private volatile long backlogNextExpirationTime;
-
-    private final OutboundTcpConnectionPool poolReference;
-
-    private final CoalescingStrategy cs;
-    private DataOutputStreamPlus out;
-    private Socket socket;
-    private volatile long completed;
-    private final AtomicLong dropped = new AtomicLong();
-    private volatile int currentMsgBufferCount = 0;
-    private volatile int targetVersion;
-
-    public OutboundTcpConnection(OutboundTcpConnectionPool pool, String name)
-    {
-        super("MessagingService-Outgoing-" + pool.endPoint() + "-" + name);
-        this.poolReference = pool;
-        cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
-
-        // We want to use the most precise version we know because while there 
is version detection on connect(),
-        // the target version might be accessed by the pool (in 
getConnection()) before we actually connect (as we
-        // connect when the first message is submitted). Note however that the 
only case where we'll connect
-        // without knowing the true version of a node is if that node is a 
seed (otherwise, we can't know a node
-        // unless it has been gossiped to us or it has connected to us and in 
both case this sets the version) and
-        // in that case we won't rely on that targetVersion before we're 
actually connected and so the version
-        // detection in connect() will do its job.
-        targetVersion = 
MessagingService.instance().getVersion(pool.endPoint());
-    }
-
-    private static boolean isLocalDC(InetAddress targetHost)
-    {
-        String remoteDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
-        String localDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-        return remoteDC.equals(localDC);
-    }
-
-    public void enqueue(MessageOut<?> message, int id)
-    {
-        long nanoTime = System.nanoTime();
-        expireMessages(nanoTime);
-        try
-        {
-            backlog.put(new QueuedMessage(message, id, nanoTime));
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    /**
-     * This is a helper method for unit testing. Disclaimer: Do not use this 
method outside unit tests, as
-     * this method is iterating the queue which can be an expensive operation 
(CPU time, queue locking).
-     *
-     * @return true, if the queue contains at least one expired element
-     */
-    @VisibleForTesting // (otherwise = VisibleForTesting.NONE)
-    boolean backlogContainsExpiredMessages(long nowNanos)
-    {
-        return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos));
-    }
-
-    void closeSocket(boolean destroyThread)
-    {
-        logger.debug("Enqueuing socket close for {}", 
poolReference.endPoint());
-        isStopped = destroyThread; // Exit loop to stop the thread
-        backlog.clear();
-        // in the "destroyThread = true" case, enqueuing the sentinel is 
important mostly to unblock the backlog.take()
-        // (via the CoalescingStrategy) in case there's a data race between 
this method enqueuing the sentinel
-        // and run() clearing the backlog on connection failure.
-        enqueue(CLOSE_SENTINEL, -1);
-    }
-
-    void softCloseSocket()
-    {
-        enqueue(CLOSE_SENTINEL, -1);
-    }
-
-    public int getTargetVersion()
-    {
-        return targetVersion;
-    }
-
-    public void run()
-    {
-        final int drainedMessageSize = MAX_COALESCED_MESSAGES;
-        // keeping list (batch) size small for now; that way we don't have an 
unbounded array (that we never resize)
-        final List<QueuedMessage> drainedMessages = new 
ArrayList<>(drainedMessageSize);
-
-        outer:
-        while (!isStopped)
-        {
-            try
-            {
-                cs.coalesce(backlog, drainedMessages, drainedMessageSize);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            int count = currentMsgBufferCount = drainedMessages.size();
-
-            //The timestamp of the first message has already been provided to 
the coalescing strategy
-            //so skip logging it.
-            inner:
-            for (QueuedMessage qm : drainedMessages)
-            {
-                try
-                {
-                    MessageOut<?> m = qm.message;
-                    if (m == CLOSE_SENTINEL)
-                    {
-                        disconnect();
-                        if (isStopped)
-                            break outer;
-                        continue;
-                    }
-
-                    if (qm.isTimedOut(System.nanoTime()))
-                        dropped.incrementAndGet();
-                    else if (socket != null || connect())
-                        writeConnected(qm, count == 1 && backlog.isEmpty());
-                    else
-                    {
-                        // Not connected! Clear out the queue, else gossip 
messages back up. Update dropped
-                        // statistics accordingly. Hint: The statistics may be 
slightly too low, if messages
-                        // are added between the calls of backlog.size() and 
backlog.clear()
-                        dropped.addAndGet(backlog.size());
-                        backlog.clear();
-                        break inner;
-                    }
-                }
-                catch (InternodeAuthFailed e)
-                {
-                    logger.warn("Internode auth failed connecting to {}", 
poolReference.endPoint());
-                    //Remove the connection pool and other thread so messages 
aren't queued
-                    
MessagingService.instance().destroyConnectionPool(poolReference.endPoint());
-                }
-                catch (Exception e)
-                {
-                    JVMStabilityInspector.inspectThrowable(e);
-                    // really shouldn't get here, as exception handling in 
writeConnected() is reasonably robust
-                    // but we want to catch anything bad we don't drop the 
messages in the current batch
-                    logger.error("error processing a message intended for {}", 
poolReference.endPoint(), e);
-                }
-                currentMsgBufferCount = --count;
-            }
-            // Update dropped statistics by the number of unprocessed 
drainedMessages
-            dropped.addAndGet(currentMsgBufferCount);
-            drainedMessages.clear();
-        }
-    }
-
-    public int getPendingMessages()
-    {
-        return backlog.size() + currentMsgBufferCount;
-    }
-
-    public long getCompletedMesssages()
-    {
-        return completed;
-    }
-
-    public long getDroppedMessages()
-    {
-        return dropped.get();
-    }
-
-    private boolean shouldCompressConnection()
-    {
-        // assumes version >= 1.2
-        return DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.all
-               || (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint()));
-    }
-
-    private void writeConnected(QueuedMessage qm, boolean flush)
-    {
-        try
-        {
-            byte[] sessionBytes = 
qm.message.parameters.get(Tracing.TRACE_HEADER);
-            if (sessionBytes != null)
-            {
-                UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-                TraceState state = Tracing.instance.get(sessionId);
-                String message = String.format("Sending %s message to %s 
message size %d bytes", qm.message.verb,
-                                               poolReference.endPoint(),
-                                               
qm.message.serializedSize(targetVersion) + PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE);
-                // session may have already finished; see CASSANDRA-5668
-                if (state == null)
-                {
-                    byte[] traceTypeBytes = 
qm.message.parameters.get(Tracing.TRACE_TYPE);
-                    Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                    Tracing.instance.trace(ByteBuffer.wrap(sessionBytes), 
message, traceType.getTTL());
-                }
-                else
-                {
-                    state.trace(message);
-                    if (qm.message.verb == 
MessagingService.Verb.REQUEST_RESPONSE)
-                        Tracing.instance.doneWithNonLocalSession(state);
-                }
-            }
-
-            long timestampMillis = 
NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos);
-            writeInternal(qm.message, qm.id, timestampMillis);
-
-            completed++;
-            if (flush)
-                out.flush();
-        }
-        catch (Throwable e)
-        {
-            JVMStabilityInspector.inspectThrowable(e);
-            disconnect();
-            if (e instanceof IOException || e.getCause() instanceof 
IOException)
-            {
-                logger.debug("Error writing to {}", poolReference.endPoint(), 
e);
-
-                // If we haven't retried this message yet, put it back on the 
queue to retry after re-connecting.
-                // See CASSANDRA-5393 and CASSANDRA-12192.
-                if (qm.shouldRetry())
-                {
-                    try
-                    {
-                        backlog.put(new RetriedQueuedMessage(qm));
-                    }
-                    catch (InterruptedException e1)
-                    {
-                        throw new AssertionError(e1);
-                    }
-                }
-            }
-            else
-            {
-                // Non IO exceptions are likely a programming error so let's 
not silence them
-                logger.error("error writing to {}", poolReference.endPoint(), 
e);
-            }
-        }
-    }
-
-    private void writeInternal(MessageOut<?> message, int id, long timestamp) 
throws IOException
-    {
-        //If you add/remove fields before the message don't forget to update 
PROTOCOL_MAGIC_ID_TIMESTAMP_SIZE
-        out.writeInt(MessagingService.PROTOCOL_MAGIC);
-        out.writeInt(id);
-
-        // int cast cuts off the high-order half of the timestamp, which we 
can assume remains
-        // the same between now and when the recipient reconstructs it.
-        out.writeInt((int) timestamp);
-        message.serialize(out, targetVersion);
-    }
-
-    private static void writeHeader(DataOutput out, int version, boolean 
compressionEnabled) throws IOException
-    {
-        // 2 bits: unused.  used to be "serializer type," which was always 
Binary
-        // 1 bit: compression
-        // 1 bit: streaming mode
-        // 3 bits: unused
-        // 8 bits: version
-        // 15 bits: unused
-        int header = 0;
-        if (compressionEnabled)
-            header |= 4;
-        header |= (version << 8);
-        out.writeInt(header);
-    }
-
-    private void disconnect()
-    {
-        if (socket != null)
-        {
-            try
-            {
-                socket.close();
-                logger.debug("Socket to {} closed", poolReference.endPoint());
-            }
-            catch (IOException e)
-            {
-                logger.debug("Exception closing connection to {}", 
poolReference.endPoint(), e);
-            }
-            out = null;
-            socket = null;
-        }
-    }
-
-    @SuppressWarnings("resource")
-    private boolean connect() throws InternodeAuthFailed
-    {
-        InetAddress endpoint = poolReference.endPoint();
-        if 
(!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, 
poolReference.portFor(endpoint)))
-        {
-            throw new InternodeAuthFailed();
-        }
-
-        logger.debug("Attempting to connect to {}", endpoint);
-
-
-        long start = System.nanoTime();
-        long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
-        while (System.nanoTime() - start < timeout)
-        {
-            targetVersion = MessagingService.instance().getVersion(endpoint);
-            try
-            {
-                socket = poolReference.newSocket();
-                socket.setKeepAlive(true);
-                if (isLocalDC(endpoint))
-                {
-                    socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
-                }
-                else
-                {
-                    
socket.setTcpNoDelay(DatabaseDescriptor.getInterDCTcpNoDelay());
-                }
-                if (DatabaseDescriptor.getInternodeSendBufferSize() > 0)
-                {
-                    try
-                    {
-                        
socket.setSendBufferSize(DatabaseDescriptor.getInternodeSendBufferSize());
-                    }
-                    catch (SocketException se)
-                    {
-                        logger.warn("Failed to set send buffer size on 
internode socket.", se);
-                    }
-                }
-
-                // SocketChannel may be null when using SSL
-                WritableByteChannel ch = socket.getChannel();
-                out = new BufferedDataOutputStreamPlus(ch != null ? ch : 
Channels.newChannel(socket.getOutputStream()), BUFFER_SIZE);
-
-                out.writeInt(MessagingService.PROTOCOL_MAGIC);
-                writeHeader(out, targetVersion, shouldCompressConnection());
-                out.flush();
-
-                DataInputStream in = new 
DataInputStream(socket.getInputStream());
-                int maxTargetVersion = handshakeVersion(in);
-                if (maxTargetVersion == NO_VERSION)
-                {
-                    // no version is returned, so disconnect an try again
-                    logger.trace("Target max version is {}; no version 
information yet, will retry", maxTargetVersion);
-                    disconnect();
-                    continue;
-                }
-                else
-                {
-                    MessagingService.instance().setVersion(endpoint, 
maxTargetVersion);
-                }
-
-                if (targetVersion > maxTargetVersion)
-                {
-                    logger.trace("Target max version is {}; will reconnect 
with that version", maxTargetVersion);
-                    try
-                    {
-                        if (DatabaseDescriptor.getSeeds().contains(endpoint))
-                            logger.warn("Seed gossip version is {}; will not 
connect with that version", maxTargetVersion);
-                    }
-                    catch (Throwable e)
-                    {
-                        // If invalid yaml has been added to the config since 
startup, getSeeds() will throw an AssertionError
-                        // Additionally, third party seed providers may throw 
exceptions if network is flakey
-                        // Regardless of what's thrown, we must catch it, 
disconnect, and try again
-                        JVMStabilityInspector.inspectThrowable(e);
-                        logger.warn("Configuration error prevented outbound 
connection: {}", e.getLocalizedMessage());
-                    }
-                    finally
-                    {
-                        disconnect();
-                        return false;
-                    }
-                }
-
-                if (targetVersion < maxTargetVersion && targetVersion < 
MessagingService.current_version)
-                {
-                    logger.trace("Detected higher max version {} (using {}); 
will reconnect when queued messages are done",
-                                 maxTargetVersion, targetVersion);
-                    softCloseSocket();
-                }
-
-                out.writeInt(MessagingService.current_version);
-                
CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), 
out);
-                if (shouldCompressConnection())
-                {
-                    out.flush();
-                    logger.trace("Upgrading OutputStream to {} to be 
compressed", endpoint);
-
-                    // TODO: custom LZ4 OS that supports BB write methods
-                    LZ4Compressor compressor = 
LZ4Factory.fastestInstance().fastCompressor();
-                    Checksum checksum = 
XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
-                    out = new WrappedDataOutputStreamPlus(new 
LZ4BlockOutputStream(socket.getOutputStream(),
-                                                                        1 << 
14,  // 16k block size
-                                                                        
compressor,
-                                                                        
checksum,
-                                                                        
true)); // no async flushing
-                }
-                logger.debug("Done connecting to {}", endpoint);
-                return true;
-            }
-            catch (SSLHandshakeException e)
-            {
-                logger.error("SSL handshake error for outbound connection to " 
+ socket, e);
-                socket = null;
-                // SSL errors won't be recoverable within timeout period so 
we'll just abort
-                return false;
-            }
-            catch (IOException e)
-            {
-                socket = null;
-                logger.debug("Unable to connect to {}", endpoint, e);
-                Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, 
TimeUnit.MILLISECONDS);
-            }
-        }
-        return false;
-    }
-
-    private int handshakeVersion(final DataInputStream inputStream)
-    {
-        final AtomicInteger version = new AtomicInteger(NO_VERSION);
-        final CountDownLatch versionLatch = new CountDownLatch(1);
-        NamedThreadFactory.createThread(() ->
-        {
-            try
-            {
-                logger.info("Handshaking version with {}", 
poolReference.endPoint());
-                version.set(inputStream.readInt());
-            }
-            catch (IOException ex)
-            {
-                final String msg = "Cannot handshake version with " + 
poolReference.endPoint();
-                if (logger.isTraceEnabled())
-                    logger.trace(msg, ex);
-                else
-                    logger.info(msg);
-            }
-            finally
-            {
-                //unblock the waiting thread on either success or fail
-                versionLatch.countDown();
-            }
-        }, "HANDSHAKE-" + poolReference.endPoint()).start();
-
-        try
-        {
-            versionLatch.await(WAIT_FOR_VERSION_MAX_TIME, 
TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException ex)
-        {
-            throw new AssertionError(ex);
-        }
-        return version.get();
-    }
-
-    /**
-     * Expire elements from the queue if the queue is pretty full and 
expiration is not already in progress.
-     * This method will only remove droppable expired entries. If no such 
element exists, nothing is removed from the queue.
-     *
-     * @param timestampNanos The current time as from System.nanoTime()
-     */
-    @VisibleForTesting
-    void expireMessages(long timestampNanos)
-    {
-        if (backlog.size() <= BACKLOG_PURGE_SIZE)
-            return; // Plenty of space
-
-        if (backlogNextExpirationTime - timestampNanos > 0)
-            return; // Expiration is not due.
-
-        /**
-         * Expiration is an expensive process. Iterating the queue locks the 
queue for both writes and
-         * reads during iter.next() and iter.remove(). Thus letting only a 
single Thread do expiration.
-         */
-        if (backlogExpirationActive.compareAndSet(false, true))
-        {
-            try
-            {
-                Iterator<QueuedMessage> iter = backlog.iterator();
-                while (iter.hasNext())
-                {
-                    QueuedMessage qm = iter.next();
-                    if (!qm.droppable)
-                        continue;
-                    if (!qm.isTimedOut(timestampNanos))
-                        continue;
-                    iter.remove();
-                    dropped.incrementAndGet();
-                }
-
-                if (logger.isTraceEnabled())
-                {
-                    long duration = 
TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos);
-                    logger.trace("Expiration of {} took {}μs", getName(), 
duration);
-                }
-            }
-            finally
-            {
-                long backlogExpirationIntervalNanos = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval());
-                backlogNextExpirationTime = timestampNanos + 
backlogExpirationIntervalNanos;
-                backlogExpirationActive.set(false);
-            }
-        }
-    }
-
-    /** messages that have not been retried yet */
-    private static class QueuedMessage implements Coalescable
-    {
-        final MessageOut<?> message;
-        final int id;
-        final long timestampNanos;
-        final boolean droppable;
-
-        QueuedMessage(MessageOut<?> message, int id, long timestampNanos)
-        {
-            this.message = message;
-            this.id = id;
-            this.timestampNanos = timestampNanos;
-            this.droppable = 
MessagingService.DROPPABLE_VERBS.contains(message.verb);
-        }
-
-        /** don't drop a non-droppable message just because it's timestamp is 
expired */
-        boolean isTimedOut(long nowNanos)
-        {
-            long messageTimeoutNanos = 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
-            return droppable && nowNanos - timestampNanos  > 
messageTimeoutNanos;
-        }
-
-        boolean shouldRetry()
-        {
-            // retry all messages once
-            return true;
-        }
-
-        public long timestampNanos()
-        {
-            return timestampNanos;
-        }
-    }
-
-    private static class RetriedQueuedMessage extends QueuedMessage
-    {
-        RetriedQueuedMessage(QueuedMessage msg)
-        {
-            super(msg.message, msg.id, msg.timestampNanos);
-        }
-
-        boolean shouldRetry()
-        {
-            return false;
-        }
-    }
-
-    private static class InternodeAuthFailed extends Exception {}
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
deleted file mode 100644
index 20a8da6..0000000
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.metrics.ConnectionMetrics;
-import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class OutboundTcpConnectionPool
-{
-    public static final long LARGE_MESSAGE_THRESHOLD =
-            Long.getLong(Config.PROPERTY_PREFIX + 
"otcp_large_message_threshold", 1024 * 64);
-
-    // pointer for the real Address.
-    private final InetAddress id;
-    private final CountDownLatch started;
-    public final OutboundTcpConnection smallMessages;
-    public final OutboundTcpConnection largeMessages;
-    public final OutboundTcpConnection gossipMessages;
-
-    // pointer to the reset Address.
-    private InetAddress resetEndpoint;
-    private ConnectionMetrics metrics;
-
-    // back-pressure state linked to this connection:
-    private final BackPressureState backPressureState;
-
-    OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState 
backPressureState)
-    {
-        id = remoteEp;
-        resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
-        started = new CountDownLatch(1);
-
-        smallMessages = new OutboundTcpConnection(this, "Small");
-        largeMessages = new OutboundTcpConnection(this, "Large");
-        gossipMessages = new OutboundTcpConnection(this, "Gossip");
-
-        this.backPressureState = backPressureState;
-    }
-
-    /**
-     * returns the appropriate connection based on message type.
-     * returns null if a connection could not be established.
-     */
-    OutboundTcpConnection getConnection(MessageOut msg)
-    {
-        if (Stage.GOSSIP == msg.getStage())
-            return gossipMessages;
-        return msg.payloadSize(smallMessages.getTargetVersion()) > 
LARGE_MESSAGE_THRESHOLD
-               ? largeMessages
-               : smallMessages;
-    }
-
-    public BackPressureState getBackPressureState()
-    {
-        return backPressureState;
-    }
-
-    void reset()
-    {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages, gossipMessages })
-            conn.closeSocket(false);
-    }
-
-    public void resetToNewerVersion(int version)
-    {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages, gossipMessages })
-        {
-            if (version > conn.getTargetVersion())
-                conn.softCloseSocket();
-        }
-    }
-
-    /**
-     * reconnect to @param remoteEP (after the current message backlog is 
exhausted).
-     * Used by Ec2MultiRegionSnitch to force nodes in the same region to 
communicate over their private IPs.
-     * @param remoteEP
-     */
-    public void reset(InetAddress remoteEP)
-    {
-        SystemKeyspace.updatePreferredIP(id, remoteEP);
-        resetEndpoint = remoteEP;
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
smallMessages, largeMessages, gossipMessages })
-            conn.softCloseSocket();
-
-        // release previous metrics and create new one with reset address
-        metrics.release();
-        metrics = new ConnectionMetrics(resetEndpoint, this);
-    }
-
-    public long getTimeouts()
-    {
-       return metrics.timeouts.getCount();
-    }
-
-
-    public void incrementTimeout()
-    {
-        metrics.timeouts.mark();
-    }
-
-    public Socket newSocket() throws IOException
-    {
-        return newSocket(endPoint());
-    }
-
-    @SuppressWarnings("resource") // Closing the socket will close the 
underlying channel.
-    public static Socket newSocket(InetAddress endpoint) throws IOException
-    {
-        // zero means 'bind on any available port.'
-        if (isEncryptedChannel(endpoint))
-        {
-            return 
SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, 
DatabaseDescriptor.getSSLStoragePort());
-        }
-        else
-        {
-            SocketChannel channel = SocketChannel.open();
-            channel.connect(new InetSocketAddress(endpoint, 
DatabaseDescriptor.getStoragePort()));
-            return channel.socket();
-        }
-    }
-
-    public static int portFor(InetAddress endpoint)
-    {
-        return isEncryptedChannel(endpoint) ? 
DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
-    }
-
-    public InetAddress endPoint()
-    {
-        if (id.equals(FBUtilities.getBroadcastAddress()))
-            return FBUtilities.getLocalAddress();
-        return resetEndpoint;
-    }
-
-    public static boolean isEncryptedChannel(InetAddress address)
-    {
-        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        switch 
(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption)
-        {
-            case none:
-                return false; // if nothing needs to be encrypted then return 
immediately.
-            case all:
-                break;
-            case dc:
-                if 
(snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
-                    return false;
-                break;
-            case rack:
-                // for rack then check if the DC's are the same.
-                if 
(snitch.getRack(address).equals(snitch.getRack(FBUtilities.getBroadcastAddress()))
-                        && 
snitch.getDatacenter(address).equals(snitch.getDatacenter(FBUtilities.getBroadcastAddress())))
-                    return false;
-                break;
-        }
-        return true;
-    }
-
-    public void start()
-    {
-        smallMessages.start();
-        largeMessages.start();
-        gossipMessages.start();
-
-        metrics = new ConnectionMetrics(id, this);
-
-        started.countDown();
-    }
-
-    public void waitForStarted()
-    {
-        if (started.getCount() == 0)
-            return;
-
-        boolean error = false;
-        try
-        {
-            if (!started.await(1, TimeUnit.MINUTES))
-                error = true;
-        }
-        catch (InterruptedException e)
-        {
-            Thread.currentThread().interrupt();
-            error = true;
-        }
-        if (error)
-            throw new IllegalStateException(String.format("Connections to %s 
are not started!", id.getHostAddress()));
-    }
-
-    public void close()
-    {
-        // these null guards are simply for tests
-        if (largeMessages != null)
-            largeMessages.closeSocket(true);
-        if (smallMessages != null)
-            smallMessages.closeSocket(true);
-        if (gossipMessages != null)
-            gossipMessages.closeSocket(true);
-        if (metrics != null)
-            metrics.release();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
new file mode 100644
index 0000000..f9fa07a
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+public class ByteBufDataInputPlus extends ByteBufInputStream implements 
DataInputPlus
+{
+    public ByteBufDataInputPlus(ByteBuf buffer)
+    {
+        super(buffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
new file mode 100644
index 0000000..0473465
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import com.google.common.base.Function;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.Memory;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+/**
+ * A {@link DataOutputPlus} that uses a {@link ByteBuf} as a backing buffer. 
This class is completely thread unsafe and
+ * it is expected that the backing buffer is sized correctly for all the 
writes you want to do (or the buffer needs
+ * to be growable).
+ */
+public class ByteBufDataOutputPlus extends ByteBufOutputStream implements 
DataOutputPlus
+{
+    private final ByteBuf buffer;
+
+    /**
+     * ByteBuffer to use for defensive copies of direct {@link ByteBuffer}s - 
see {@link #write(ByteBuffer)}.
+     */
+    private final ByteBuffer hollowBuffer = 
MemoryUtil.getHollowDirectByteBuffer();
+
+    public ByteBufDataOutputPlus(ByteBuf buffer)
+    {
+        super(buffer);
+        this.buffer = buffer;
+    }
+
+    /**
+     * {@inheritDoc} - "write the buffer without modifying its position"
+     *
+     * Unfortunately, netty's {@link ByteBuf#writeBytes(ByteBuffer)} modifies 
the byteBuffer's position,
+     * and that is unsafe in our world wrt multithreading. Hence we need to be 
careful: reference the backing array
+     * on heap ByteBuffers, and use a reusable "hollow" ByteBuffer ({@link 
#hollowBuffer}) for direct ByteBuffers.
+     */
+    @Override
+    public void write(ByteBuffer byteBuffer) throws IOException
+    {
+        if (byteBuffer.hasArray())
+        {
+            write(byteBuffer.array(), byteBuffer.arrayOffset() + 
byteBuffer.position(), byteBuffer.remaining());
+        }
+        else
+        {
+            assert byteBuffer.isDirect();
+            MemoryUtil.duplicateDirectByteBuffer(byteBuffer, hollowBuffer);
+            buffer.writeBytes(hollowBuffer);
+        }
+    }
+
+    @Override
+    public void write(Memory memory, long offset, long length) throws 
IOException
+    {
+        for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
+            write(buffer);
+    }
+
+    @Override
+    public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws 
IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void writeVInt(long v) throws IOException
+    {
+        writeUnsignedVInt(VIntCoding.encodeZigZag64(v));
+    }
+
+    @Override
+    public void writeUnsignedVInt(long v) throws IOException
+    {
+        int size = VIntCoding.computeUnsignedVIntSize(v);
+        if (size == 1)
+        {
+            buffer.writeByte((byte) (v & 0xFF));
+            return;
+        }
+
+        buffer.writeBytes(VIntCoding.encodeVInt(v, size), 0, size);
+    }
+
+    @Override
+    public void write(int b) throws IOException
+    {
+        buffer.writeByte((byte) (b & 0xFF));
+    }
+
+    @Override
+    public void writeByte(int v) throws IOException
+    {
+        buffer.writeByte((byte) (v & 0xFF));
+    }
+
+    @Override
+    public void writeBytes(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            buffer.writeByte(s.charAt(index) & 0xFF);
+    }
+
+    @Override
+    public void writeChars(String s) throws IOException
+    {
+        for (int index = 0; index < s.length(); index++)
+            buffer.writeChar(s.charAt(index));
+    }
+
+    @Override
+    public void writeUTF(String s) throws IOException
+    {
+        UnbufferedDataOutputStreamPlus.writeUTF(s, this);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to