http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java 
b/src/java/org/apache/cassandra/net/async/NettyFactory.java
new file mode 100644
index 0000000..13d8810
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -0,0 +1,375 @@
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.zip.Checksum;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.compression.Lz4FrameDecoder;
+import io.netty.handler.codec.compression.Lz4FrameEncoder;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.OpenSsl;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import 
org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.NativeTransportService;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A factory for building Netty {@link Channel}s. Channels here are setup with 
a pipeline to participate
+ * in the internode protocol handshake, either the inbound or outbound side as 
per the method invoked.
+ */
+public final class NettyFactory
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(NettyFactory.class);
+
+    /**
+     * The block size for use with netty's lz4 code.
+     */
+    private static final int COMPRESSION_BLOCK_SIZE = 1 << 16;
+
+    private static final int LZ4_HASH_SEED = 0x9747b28c;
+
+    public enum Mode { MESSAGING, STREAMING }
+
+    private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
+    static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
+    static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = 
"outboundCompressor";
+    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+
+    /** a useful addition for debugging; simply set to true to get more data 
in your logs */
+    private static final boolean WIRETRACE = false;
+    static
+    {
+        if (WIRETRACE)
+            
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
+    }
+
+    private static final boolean DEFAULT_USE_EPOLL = 
NativeTransportService.useEpoll();
+    static
+    {
+        if (!DEFAULT_USE_EPOLL)
+            logger.warn("epoll not availble {}", Epoll.unavailabilityCause());
+    }
+
+    /**
+     * The size of the receive queue for the outbound channels. As outbound 
channels do not receive data
+     * (outside of the internode messaging protocol's handshake), this value 
can be relatively small.
+     */
+    private static final int OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE = 1 << 10;
+
+    /**
+     * The size of the send queue for the inbound channels. As inbound 
channels do not send data
+     * (outside of the internode messaging protocol's handshake), this value 
can be relatively small.
+     */
+    private static final int INBOUND_CHANNEL_SEND_BUFFER_SIZE = 1 << 10;
+
+    /**
+     * A factory instance that all normal, runtime code should use. Separate 
instances should only be used for testing.
+     */
+    public static final NettyFactory instance = new 
NettyFactory(DEFAULT_USE_EPOLL);
+
+    private final boolean useEpoll;
+    private final EventLoopGroup acceptGroup;
+
+    private final EventLoopGroup inboundGroup;
+    private final EventLoopGroup outboundGroup;
+
+    /**
+     * Constructor that allows modifying the {@link NettyFactory#useEpoll} for 
testing purposes. Otherwise, use the
+     * default {@link #instance}.
+     */
+    @VisibleForTesting
+    NettyFactory(boolean useEpoll)
+    {
+        this.useEpoll = useEpoll;
+        acceptGroup = getEventLoopGroup(useEpoll, 
determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption),
+                                        
"MessagingService-NettyAcceptor-Threads", false);
+        inboundGroup = getEventLoopGroup(useEpoll, 
FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", 
false);
+        outboundGroup = getEventLoopGroup(useEpoll, 
FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", 
true);
+    }
+
+    /**
+     * Determine the number of accept threads we need, which is based upon the 
number of listening sockets we will have.
+     * We'll have either 1 or 2 listen sockets, depending on if we use SSL or 
not in combination with non-SSL. If we have both,
+     * we'll have two sockets, and thus need two threads; else one socket and 
one thread.
+     *
+     * If the operator has configured multiple IP addresses (both {@link 
org.apache.cassandra.config.Config#broadcast_address}
+     * and {@link org.apache.cassandra.config.Config#listen_address} are 
configured), then we listen on another set of sockets
+     * - basically doubling the count. See CASSANDRA-9748 for more details.
+     */
+    static int determineAcceptGroupSize(InternodeEncryption 
internode_encryption)
+    {
+        int listenSocketCount = internode_encryption == InternodeEncryption.dc 
|| internode_encryption == InternodeEncryption.rack ? 2 : 1;
+
+        if (MessagingService.shouldListenOnBroadcastAddress())
+            listenSocketCount *= 2;
+
+        return listenSocketCount;
+    }
+
+    /**
+     * Create an {@link EventLoopGroup}, for epoll or nio. The {@code 
boostIoRatio} flag passes a hint to the netty
+     * event loop threads to optimize comsuming all the tasks from the netty 
channel before checking for IO activity.
+     * By default, netty will process some maximum number of tasks off it's 
queue before it will check for activity on
+     * any of the open FDs, which basically amounts to checking for any 
incoming data. If you have a class of event loops
+     * that that do almost *no* inbound activity (like cassandra's outbound 
channels), then it behooves us to have the
+     * outbound netty channel consume as many tasks as it can before making 
the system calls to check up on the FDs,
+     * as we're not expecting any incoming data on those sockets, anyways. 
Thus, we pass the magic value {@code 100}
+     * to achieve the maximum consuption from the netty queue. (for 
implementation details, as of netty 4.1.8,
+     * see {@link io.netty.channel.epoll.EpollEventLoop#run()}.
+     */
+    static EventLoopGroup getEventLoopGroup(boolean useEpoll, int threadCount, 
String threadNamePrefix, boolean boostIoRatio)
+    {
+        if (useEpoll)
+        {
+            logger.debug("using netty epoll event loop for pool prefix {}", 
threadNamePrefix);
+            EpollEventLoopGroup eventLoopGroup = new 
EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix));
+            if (boostIoRatio)
+                eventLoopGroup.setIoRatio(100);
+            return eventLoopGroup;
+        }
+
+        logger.debug("using netty nio event loop for pool prefix {}", 
threadNamePrefix);
+        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threadCount, 
new DefaultThreadFactory(threadNamePrefix));
+        if (boostIoRatio)
+            eventLoopGroup.setIoRatio(100);
+        return eventLoopGroup;
+    }
+
+    /**
+     * Create a {@link Channel} that listens on the {@code localAddr}. This 
method will block while trying to bind to the address,
+     * but it does not make a remote call.
+     */
+    public Channel createInboundChannel(InetSocketAddress localAddr, 
InboundInitializer initializer, int receiveBufferSize) throws 
ConfigurationException
+    {
+        String nic = FBUtilities.getNetworkInterface(localAddr.getAddress());
+        logger.info("Starting Messaging Service on {} {}, encryption: {}",
+                    localAddr, nic == null ? "" : String.format(" (%s)", nic), 
encryptionLogStatement(initializer.encryptionOptions));
+        Class<? extends ServerChannel> transport = useEpoll ? 
EpollServerSocketChannel.class : NioServerSocketChannel.class;
+        ServerBootstrap bootstrap = new ServerBootstrap().group(acceptGroup, 
inboundGroup)
+                                                         .channel(transport)
+                                                         
.option(ChannelOption.SO_BACKLOG, 500)
+                                                         
.childOption(ChannelOption.SO_KEEPALIVE, true)
+                                                         
.childOption(ChannelOption.TCP_NODELAY, true)
+                                                         
.childOption(ChannelOption.SO_REUSEADDR, true)
+                                                         
.childOption(ChannelOption.SO_SNDBUF, INBOUND_CHANNEL_SEND_BUFFER_SIZE)
+                                                         
.childHandler(initializer);
+
+        if (receiveBufferSize > 0)
+            bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize);
+
+        ChannelFuture channelFuture = bootstrap.bind(localAddr);
+
+        if (!channelFuture.awaitUninterruptibly().isSuccess())
+        {
+            if (channelFuture.channel().isOpen())
+                channelFuture.channel().close();
+
+            Throwable failedChannelCause = channelFuture.cause();
+
+            String causeString = "";
+            if (failedChannelCause != null && failedChannelCause.getMessage() 
!= null)
+                causeString = failedChannelCause.getMessage();
+
+            if (causeString.contains("in use"))
+            {
+                throw new ConfigurationException(localAddr + " is in use by 
another process.  Change listen_address:storage_port " +
+                                                 "in cassandra.yaml to values 
that do not conflict with other services");
+            }
+            // looking at the jdk source, solaris/windows bind failue messages 
both use the phrase "cannot assign requested address".
+            // windows message uses "Cannot" (with a capital 'C'), and solaris 
(a/k/a *nux) doe not. hence we search for "annot" <sigh>
+            else if (causeString.contains("annot assign requested address"))
+            {
+                throw new ConfigurationException("Unable to bind to address " 
+ localAddr
+                                                 + ". Set listen_address in 
cassandra.yaml to an interface you can bind to, e.g., your private IP address 
on EC2");
+            }
+            else
+            {
+                throw new ConfigurationException("failed to bind to: " + 
localAddr, failedChannelCause);
+            }
+        }
+
+        return channelFuture.channel();
+    }
+
+    public static class InboundInitializer extends 
ChannelInitializer<SocketChannel>
+    {
+        private final IInternodeAuthenticator authenticator;
+        private final ServerEncryptionOptions encryptionOptions;
+        private final ChannelGroup channelGroup;
+
+        public InboundInitializer(IInternodeAuthenticator authenticator, 
ServerEncryptionOptions encryptionOptions, ChannelGroup channelGroup)
+        {
+            this.authenticator = authenticator;
+            this.encryptionOptions = encryptionOptions;
+            this.channelGroup = channelGroup;
+        }
+
+        @Override
+        public void initChannel(SocketChannel channel) throws Exception
+        {
+            channelGroup.add(channel);
+            ChannelPipeline pipeline = channel.pipeline();
+
+            // order of handlers: ssl -> logger -> handshakeHandler
+            if (encryptionOptions != null)
+            {
+                SslContext sslContext = 
SSLFactory.getSslContext(encryptionOptions, true, true);
+                SslHandler sslHandler = sslContext.newHandler(channel.alloc());
+                logger.trace("creating inbound netty SslContext: context={}, 
engine={}", sslContext.getClass().getName(), 
sslHandler.engine().getClass().getName());
+                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);       
     }
+
+            if (WIRETRACE)
+                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+            channel.pipeline().addLast(HANDSHAKE_HANDLER_NAME, new 
InboundHandshakeHandler(authenticator));
+        }
+    }
+
+    private String encryptionLogStatement(ServerEncryptionOptions options)
+    {
+        if (options == null)
+            return "disabled";
+
+        String encryptionType = OpenSsl.isAvailable() ? "openssl" : "jdk";
+        return "enabled (" + encryptionType + ')';
+    }
+
+    /**
+     * Create the {@link Bootstrap} for connecting to a remote peer. This 
method does <b>not</b> attempt to connect to the peer,
+     * and thus does not block.
+     */
+    public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
+    {
+        logger.debug("creating outbound bootstrap to peer {}, compression: {}, 
encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
+                     params.compress, 
encryptionLogStatement(params.encryptionOptions),
+                     params.coalescingStrategy.isPresent() ? 
params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+        Class<? extends Channel>  transport = useEpoll ? 
EpollSocketChannel.class : NioSocketChannel.class;
+        Bootstrap bootstrap = new Bootstrap().group(outboundGroup)
+                              .channel(transport)
+                              .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
2000)
+                              .option(ChannelOption.SO_KEEPALIVE, true)
+                              .option(ChannelOption.SO_REUSEADDR, true)
+                              .option(ChannelOption.SO_SNDBUF, 
params.sendBufferSize)
+                              .option(ChannelOption.SO_RCVBUF, 
OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE)
+                              .option(ChannelOption.TCP_NODELAY, 
params.tcpNoDelay)
+                              .option(ChannelOption.WRITE_BUFFER_WATER_MARK, 
params.waterMark)
+                              .handler(new OutboundInitializer(params));
+        bootstrap.localAddress(params.connectionId.local(), 0);
+        bootstrap.remoteAddress(params.connectionId.connectionAddress());
+        return bootstrap;
+    }
+
+    public static class OutboundInitializer extends 
ChannelInitializer<SocketChannel>
+    {
+        private final OutboundConnectionParams params;
+
+        OutboundInitializer(OutboundConnectionParams params)
+        {
+            this.params = params;
+        }
+
+        public void initChannel(SocketChannel channel) throws Exception
+        {
+            ChannelPipeline pipeline = channel.pipeline();
+
+            // order of handlers: ssl -> logger -> handshakeHandler
+            if (params.encryptionOptions != null)
+            {
+                SslContext sslContext = 
SSLFactory.getSslContext(params.encryptionOptions, true, false);
+
+                final SslHandler sslHandler;
+                if (params.encryptionOptions.require_endpoint_verification)
+                {
+                    InetSocketAddress peer = 
params.connectionId.remoteAddress();
+                    sslHandler = sslContext.newHandler(channel.alloc(), 
peer.getHostString(), peer.getPort());
+                    SSLEngine engine = sslHandler.engine();
+                    SSLParameters sslParameters = engine.getSSLParameters();
+                    sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+                    engine.setSSLParameters(sslParameters);
+                }
+                else
+                {
+                    sslHandler = sslContext.newHandler(channel.alloc());
+                }
+
+                logger.trace("creating outbound netty SslContext: context={}, 
engine={}", sslContext.getClass().getName(), 
sslHandler.engine().getClass().getName());
+                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+            }
+
+            if (NettyFactory.WIRETRACE)
+                pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
+
+            pipeline.addLast(HANDSHAKE_HANDLER_NAME, new 
OutboundHandshakeHandler(params));
+        }
+    }
+
+    public void close()
+    {
+        acceptGroup.shutdownGracefully();
+        outboundGroup.shutdownGracefully();
+        inboundGroup.shutdownGracefully();
+    }
+
+    static Lz4FrameEncoder createLz4Encoder(int protocolVersion)
+    {
+        return new Lz4FrameEncoder(LZ4Factory.fastestInstance(), false, 
COMPRESSION_BLOCK_SIZE, checksumForFrameEncoders(protocolVersion));
+    }
+
+    private static Checksum checksumForFrameEncoders(int protocolVersion)
+    {
+        if (protocolVersion >= MessagingService.current_version)
+            return ChecksumType.CRC32.newInstance();
+        return 
XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum();
+    }
+
+    static Lz4FrameDecoder createLz4Decoder(int protocolVersion)
+    {
+        return new Lz4FrameDecoder(LZ4Factory.fastestInstance(), 
checksumForFrameEncoders(protocolVersion));
+    }
+
+    public static EventExecutor executorForChannelGroups()
+    {
+        return new DefaultEventExecutor();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java 
b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
new file mode 100644
index 0000000..24dc5ff
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+/**
+ * Identifies an outbound messaging connection.
+ *
+ * This mainly hold the remote address and the type (small/large messages or 
gossip) of connection used, but with the
+ * additional detail that in some case (typically public EC2 address across 
regions) the address to which we connect
+ * to the remote is different from the address by which the node is known by 
the rest of the C*.
+ */
+public class OutboundConnectionIdentifier
+{
+    enum ConnectionType
+    {
+        GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE
+    }
+
+    /**
+     * Memoization of the local node's broadcast address.
+     */
+    private final InetSocketAddress localAddr;
+
+    /**
+     * The address by which the remote is identified. This may be different 
from {@link #remoteConnectionAddr} for
+     * something like EC2 public IP address which need to be used for 
communication between EC2 regions.
+     */
+    private final InetSocketAddress remoteAddr;
+
+    /**
+     * The address to which we're connecting to the node (often the same as 
{@link #remoteAddr} but not always).
+     */
+    private final InetSocketAddress remoteConnectionAddr;
+
+    private final ConnectionType connectionType;
+
+    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+                                         InetSocketAddress remoteAddr,
+                                         InetSocketAddress 
remoteConnectionAddr,
+                                         ConnectionType connectionType)
+    {
+        this.localAddr = localAddr;
+        this.remoteAddr = remoteAddr;
+        this.remoteConnectionAddr = remoteConnectionAddr;
+        this.connectionType = connectionType;
+    }
+
+    private OutboundConnectionIdentifier(InetSocketAddress localAddr,
+                                         InetSocketAddress remoteAddr,
+                                         ConnectionType connectionType)
+    {
+        this(localAddr, remoteAddr, remoteAddr, connectionType);
+    }
+
+    /**
+     * Creates an identifier for a small message connection and using the 
remote "identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier small(InetSocketAddress 
localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
ConnectionType.SMALL_MESSAGE);
+    }
+
+    /**
+     * Creates an identifier for a large message connection and using the 
remote "identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier large(InetSocketAddress 
localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
ConnectionType.LARGE_MESSAGE);
+    }
+
+    /**
+     * Creates an identifier for a gossip connection and using the remote 
"identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier gossip(InetSocketAddress 
localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
ConnectionType.GOSSIP);
+    }
+
+    /**
+     * Returns a newly created connection identifier to the same remote that 
this identifier, but using the provided
+     * address as connection address.
+     *
+     * @param remoteConnectionAddr the address to use for connection to the 
remote in the new identifier.
+     * @return a newly created connection identifier that differs from this 
one only by using {@code remoteConnectionAddr}
+     * as connection address to the remote.
+     */
+    OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress 
remoteConnectionAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
remoteConnectionAddr, connectionType);
+    }
+
+    /**
+     * The local node address.
+     */
+    InetAddress local()
+    {
+        return localAddr.getAddress();
+    }
+
+    /**
+     * The remote node identifying address (the one to use for anything else 
than connecting to the node).
+     */
+    InetSocketAddress remoteAddress()
+    {
+        return remoteAddr;
+    }
+
+    /**
+     * The remote node identifying address (the one to use for anything else 
than connecting to the node).
+     */
+    InetAddress remote()
+    {
+        return remoteAddr.getAddress();
+    }
+
+    /**
+     * The remote node connection address (the one to use to actually connect 
to the remote, and only that).
+     */
+    InetSocketAddress connectionAddress()
+    {
+        return remoteConnectionAddr;
+    }
+
+    /**
+     * The type of this connection.
+     */
+    ConnectionType type()
+    {
+        return connectionType;
+    }
+
+    @Override
+    public String toString()
+    {
+        return remoteAddr.equals(remoteConnectionAddr)
+               ? String.format("%s (%s)", remoteAddr, connectionType)
+               : String.format("%s on %s (%s)", remoteAddr, 
remoteConnectionAddr, connectionType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java 
b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
new file mode 100644
index 0000000..282480e
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.base.Preconditions;
+
+import io.netty.channel.WriteBufferWaterMark;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * A collection of data points to be passed around for outbound connections.
+ */
+public class OutboundConnectionParams
+{
+    public static final int DEFAULT_SEND_BUFFER_SIZE = 1 << 16;
+
+    final OutboundConnectionIdentifier connectionId;
+    final Consumer<HandshakeResult> callback;
+    final ServerEncryptionOptions encryptionOptions;
+    final NettyFactory.Mode mode;
+    final boolean compress;
+    final Optional<CoalescingStrategy> coalescingStrategy;
+    final int sendBufferSize;
+    final boolean tcpNoDelay;
+    final Supplier<QueuedMessage> backlogSupplier;
+    final Consumer<MessageResult> messageResultConsumer;
+    final WriteBufferWaterMark waterMark;
+    final int protocolVersion;
+
+    private OutboundConnectionParams(OutboundConnectionIdentifier connectionId,
+                                     Consumer<HandshakeResult> callback,
+                                     ServerEncryptionOptions encryptionOptions,
+                                     NettyFactory.Mode mode,
+                                     boolean compress,
+                                     Optional<CoalescingStrategy> 
coalescingStrategy,
+                                     int sendBufferSize,
+                                     boolean tcpNoDelay,
+                                     Supplier<QueuedMessage> backlogSupplier,
+                                     Consumer<MessageResult> 
messageResultConsumer,
+                                     WriteBufferWaterMark waterMark,
+                                     int protocolVersion)
+    {
+        this.connectionId = connectionId;
+        this.callback = callback;
+        this.encryptionOptions = encryptionOptions;
+        this.mode = mode;
+        this.compress = compress;
+        this.coalescingStrategy = coalescingStrategy;
+        this.sendBufferSize = sendBufferSize;
+        this.tcpNoDelay = tcpNoDelay;
+        this.backlogSupplier = backlogSupplier;
+        this.messageResultConsumer = messageResultConsumer;
+        this.waterMark = waterMark;
+        this.protocolVersion = protocolVersion;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Builder builder(OutboundConnectionParams params)
+    {
+        return new Builder(params);
+    }
+    
+    public static class Builder
+    {
+        private OutboundConnectionIdentifier connectionId;
+        private Consumer<HandshakeResult> callback;
+        private ServerEncryptionOptions encryptionOptions;
+        private NettyFactory.Mode mode;
+        private boolean compress;
+        private Optional<CoalescingStrategy> coalescingStrategy = 
Optional.empty();
+        private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+        private boolean tcpNoDelay;
+        private Supplier<QueuedMessage> backlogSupplier;
+        private Consumer<MessageResult> messageResultConsumer;
+        private WriteBufferWaterMark waterMark = WriteBufferWaterMark.DEFAULT;
+        int protocolVersion;
+
+        private Builder()
+        {   }
+
+        private Builder(OutboundConnectionParams params)
+        {
+            this.connectionId = params.connectionId;
+            this.callback = params.callback;
+            this.encryptionOptions = params.encryptionOptions;
+            this.mode = params.mode;
+            this.compress = params.compress;
+            this.coalescingStrategy = params.coalescingStrategy;
+            this.sendBufferSize = params.sendBufferSize;
+            this.tcpNoDelay = params.tcpNoDelay;
+            this.backlogSupplier = params.backlogSupplier;
+            this.messageResultConsumer = params.messageResultConsumer;
+        }
+
+        public Builder connectionId(OutboundConnectionIdentifier connectionId)
+        {
+            this.connectionId = connectionId;
+            return this;
+        }
+
+        public Builder callback(Consumer<HandshakeResult> callback)
+        {
+            this.callback = callback;
+            return this;
+        }
+
+        public Builder encryptionOptions(ServerEncryptionOptions 
encryptionOptions)
+        {
+            this.encryptionOptions = encryptionOptions;
+            return this;
+        }
+
+        public Builder mode(NettyFactory.Mode mode)
+        {
+            this.mode = mode;
+            return this;
+        }
+
+        public Builder compress(boolean compress)
+        {
+            this.compress = compress;
+            return this;
+        }
+
+        public Builder coalescingStrategy(Optional<CoalescingStrategy> 
coalescingStrategy)
+        {
+            this.coalescingStrategy = coalescingStrategy;
+            return this;
+        }
+
+        public Builder sendBufferSize(int sendBufferSize)
+        {
+            this.sendBufferSize = sendBufferSize;
+            return this;
+        }
+
+        public Builder tcpNoDelay(boolean tcpNoDelay)
+        {
+            this.tcpNoDelay = tcpNoDelay;
+            return this;
+        }
+
+        public Builder backlogSupplier(Supplier<QueuedMessage> backlogSupplier)
+        {
+            this.backlogSupplier = backlogSupplier;
+            return this;
+        }
+
+        public Builder messageResultConsumer(Consumer<MessageResult> 
messageResultConsumer)
+        {
+            this.messageResultConsumer = messageResultConsumer;
+            return this;
+        }
+
+        public Builder waterMark(WriteBufferWaterMark waterMark)
+        {
+            this.waterMark = waterMark;
+            return this;
+        }
+
+        public Builder protocolVersion(int protocolVersion)
+        {
+            this.protocolVersion = protocolVersion;
+            return this;
+        }
+
+        public OutboundConnectionParams build()
+        {
+            Preconditions.checkArgument(protocolVersion > 0, "illegal protocol 
version: " + protocolVersion);
+            Preconditions.checkArgument(sendBufferSize > 0 && sendBufferSize < 
1 << 20, "illegal send buffer size: " + sendBufferSize);
+
+            return new OutboundConnectionParams(connectionId, callback, 
encryptionOptions, mode, compress, coalescingStrategy, sendBufferSize,
+                                                tcpNoDelay, backlogSupplier, 
messageResultConsumer, waterMark, protocolVersion);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java 
b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
new file mode 100644
index 0000000..703549a
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
+import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage;
+
+import static org.apache.cassandra.config.Config.PROPERTY_PREFIX;
+
+/**
+ * A {@link ChannelHandler} to execute the send-side of the internode 
communication handshake protocol.
+ * As soon as the handler is added to the channel via {@link 
#channelActive(ChannelHandlerContext)}
+ * (which is only invoked if the underlying TCP connection was properly 
established), the {@link FirstHandshakeMessage}
+ * of the internode messaging protocol is automatically sent out. See {@link 
HandshakeProtocol} for full details
+ * about the internode messaging hndshake protocol.
+ * <p>
+ * Upon completion of the handshake (on success or fail), the {@link 
#callback} is invoked to let the listener
+ * know the result of the handshake. See {@link HandshakeResult} for details 
about the different result states.
+ * <p>
+ * This class extends {@link ByteToMessageDecoder}, which is a {@link 
ChannelInboundHandler}, because this handler
+ * waits for the peer's handshake response (the {@link SecondHandshakeMessage} 
of the internode messaging handshake protocol).
+ */
+public class OutboundHandshakeHandler extends ByteToMessageDecoder
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(OutboundHandshakeHandler.class);
+
+    /**
+     * The number of milliseconds to wait before closing a channel if there 
has been no progress (when there is
+     * data to be sent).See {@link IdleStateHandler} and {@link 
MessageOutHandler#userEventTriggered(ChannelHandlerContext, Object)}.
+     */
+    private static final long DEFAULT_WRITE_IDLE_MS = 
TimeUnit.SECONDS.toMillis(10);
+    private static final String WRITE_IDLE_PROPERTY = PROPERTY_PREFIX + 
"outbound_write_idle_ms";
+    private static final long WRITE_IDLE_MS = 
Long.getLong(WRITE_IDLE_PROPERTY, DEFAULT_WRITE_IDLE_MS);
+
+    private final OutboundConnectionIdentifier connectionId;
+
+    /**
+     * The expected messaging service version to use.
+     */
+    private final int messagingVersion;
+
+    /**
+     * A function to invoke upon completion of the attempt, success or 
failure, to connect to the peer.
+     */
+    private final Consumer<HandshakeResult> callback;
+    private final NettyFactory.Mode mode;
+    private final OutboundConnectionParams params;
+
+    OutboundHandshakeHandler(OutboundConnectionParams params)
+    {
+        this.params = params;
+        this.connectionId = params.connectionId;
+        this.messagingVersion = params.protocolVersion;
+        this.callback = params.callback;
+        this.mode = params.mode;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Invoked when the channel is made active, and sends out the {@link 
FirstHandshakeMessage}
+     */
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception
+    {
+        FirstHandshakeMessage msg = new 
FirstHandshakeMessage(messagingVersion, mode, params.compress);
+        logger.trace("starting handshake with peer {}, msg = {}", 
connectionId.connectionAddress(), msg);
+        ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> 
firstHandshakeMessageListener(future, ctx));
+        ctx.fireChannelActive();
+    }
+
+    /**
+     * A simple listener to make sure we could send the {@link 
FirstHandshakeMessage} to the socket,
+     * and fail the handshake attempt if we could not (for example, maybe we 
could create the TCP socket, but then
+     * the connection gets closed for some reason).
+     */
+    void firstHandshakeMessageListener(Future<? super Void> future, 
ChannelHandlerContext ctx)
+    {
+        if (future.isSuccess())
+            return;
+
+        ChannelFuture channelFuture = (ChannelFuture)future;
+        exceptionCaught(ctx, channelFuture.cause());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Invoked when we get the response back from the peer, which should 
contain the second message of the internode messaging handshake.
+     * <p>
+     * If the peer's protocol version does not equal what we were expecting, 
immediately close the channel (and socket);
+     * do *not* send out the third message of the internode messaging 
handshake.
+     * We will reconnect on the appropriate protocol version.
+     */
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out) throws Exception
+    {
+        SecondHandshakeMessage msg = SecondHandshakeMessage.maybeDecode(in);
+        if (msg == null)
+            return;
+
+        logger.trace("received second handshake message from peer {}, msg = 
{}", connectionId.connectionAddress(), msg);
+        final int peerMessagingVersion = msg.messagingVersion;
+
+        // we expected a higher protocol version, but it was actually lower
+        if (messagingVersion > peerMessagingVersion)
+        {
+            logger.trace("peer's max version is {}; will reconnect with that 
version", peerMessagingVersion);
+            try
+            {
+                if 
(DatabaseDescriptor.getSeeds().contains(connectionId.remote()))
+                    logger.warn("Seed gossip version is {}; will not connect 
with that version", peerMessagingVersion);
+            }
+            catch (Throwable e)
+            {
+                // If invalid yaml has been added to the config since startup, 
getSeeds() will throw an AssertionError
+                // Additionally, third party seed providers may throw 
exceptions if network is flakey.
+                // Regardless of what's thrown, we must catch it, disconnect, 
and try again
+                logger.warn("failed to reread yaml (on trying to connect to a 
seed): {}", e.getLocalizedMessage());
+            }
+            ctx.close();
+            callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+            return;
+        }
+        // we anticipate a version that is lower than what peer is actually 
running
+        else if (messagingVersion < peerMessagingVersion && messagingVersion < 
MessagingService.current_version)
+        {
+            logger.trace("peer has a higher max version than expected {} 
(previous value {})", peerMessagingVersion, messagingVersion);
+            ctx.close();
+            callback.accept(HandshakeResult.disconnect(peerMessagingVersion));
+            return;
+        }
+
+        try
+        {
+            ctx.writeAndFlush(new 
ThirdHandshakeMessage(MessagingService.current_version, 
connectionId.local()).encode(ctx.alloc()));
+            ChannelWriter channelWriter = setupPipeline(ctx.channel(), 
peerMessagingVersion);
+            callback.accept(HandshakeResult.success(channelWriter, 
peerMessagingVersion));
+        }
+        catch (Exception e)
+        {
+            logger.info("failed to finalize internode messaging handshake", e);
+            ctx.close();
+            callback.accept(HandshakeResult.failed());
+        }
+    }
+
+    @VisibleForTesting
+    ChannelWriter setupPipeline(Channel channel, int messagingVersion)
+    {
+        ChannelPipeline pipeline = channel.pipeline();
+        pipeline.addLast("idleWriteHandler", new IdleStateHandler(true, 0, 
WRITE_IDLE_MS, 0, TimeUnit.MILLISECONDS));
+        if (params.compress)
+            pipeline.addLast(NettyFactory.OUTBOUND_COMPRESSOR_HANDLER_NAME, 
NettyFactory.createLz4Encoder(messagingVersion));
+
+        ChannelWriter channelWriter = ChannelWriter.create(channel, 
params.messageResultConsumer, params.coalescingStrategy);
+        pipeline.addLast("messageOutHandler", new 
MessageOutHandler(connectionId, messagingVersion, channelWriter, 
params.backlogSupplier));
+        pipeline.remove(this);
+        return channelWriter;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+    {
+        logger.error("Failed to properly handshake with peer {}. Closing the 
channel.", connectionId, cause);
+        ctx.close();
+        callback.accept(HandshakeResult.failed());
+    }
+
+    /**
+     * The result of the handshake. Handshake has 3 possible outcomes:
+     *  1) it can be successful, in which case the channel and version to used 
is returned in this result.
+     *  2) we may decide to disconnect to reconnect with another protocol 
version (namely, the version is passed in this result).
+     *  3) we can have a negotiation failure for an unknown reason. 
(#sadtrombone)
+     */
+    public static class HandshakeResult
+    {
+        static final int UNKNOWN_PROTOCOL_VERSION = -1;
+
+        /**
+         * Describes the result of receiving the response back from the peer 
(Message 2 of the handshake)
+         * and implies an action that should be taken.
+         */
+        enum Outcome
+        {
+            SUCCESS, DISCONNECT, NEGOTIATION_FAILURE
+        }
+
+        /** The channel for the connection, only set for successful handshake. 
*/
+        final ChannelWriter channelWriter;
+        /** The version negotiated with the peer. Set unless this is a {@link 
Outcome#NEGOTIATION_FAILURE}. */
+        final int negotiatedMessagingVersion;
+        /** The handshake {@link Outcome}. */
+        final Outcome outcome;
+
+        private HandshakeResult(ChannelWriter channelWriter, int 
negotiatedMessagingVersion, Outcome outcome)
+        {
+            this.channelWriter = channelWriter;
+            this.negotiatedMessagingVersion = negotiatedMessagingVersion;
+            this.outcome = outcome;
+        }
+
+        static HandshakeResult success(ChannelWriter channel, int 
negotiatedMessagingVersion)
+        {
+            return new HandshakeResult(channel, negotiatedMessagingVersion, 
Outcome.SUCCESS);
+        }
+
+        static HandshakeResult disconnect(int negotiatedMessagingVersion)
+        {
+            return new HandshakeResult(null, negotiatedMessagingVersion, 
Outcome.DISCONNECT);
+        }
+
+        static HandshakeResult failed()
+        {
+            return new HandshakeResult(null, UNKNOWN_PROTOCOL_VERSION, 
Outcome.NEGOTIATION_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java 
b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
new file mode 100644
index 0000000..6bda9cd
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.async.NettyFactory.Mode;
+import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Represents one connection to a peer, and handles the state transistions on 
the connection and the netty {@link Channel}
+ * The underlying socket is not opened until explicitly requested (by sending 
a message).
+ *
+ * The basic setup for the channel is like this: a message is requested to be 
sent via {@link #sendMessage(MessageOut, int)}.
+ * If the channel is not established, then we need to create it (obviously). 
To prevent multiple threads from creating
+ * independent connections, they attempt to update the {@link #state}; one 
thread will win the race and create the connection.
+ * Upon sucessfully setting up the connection/channel, the {@link #state} will 
be updated again (to {@link State#READY},
+ * which indicates to other threads that the channel is ready for business and 
can be written to.
+ *
+ */
+public class OutboundMessagingConnection
+{
+    static final Logger logger = 
LoggerFactory.getLogger(OutboundMessagingConnection.class);
+    private static final NoSpamLogger errorLogger = 
NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS);
+
+    private static final String INTRADC_TCP_NODELAY_PROPERTY = 
Config.PROPERTY_PREFIX + "otc_intradc_tcp_nodelay";
+
+    /**
+     * Enabled/disable TCP_NODELAY for intradc connections. Defaults to 
enabled.
+     */
+    private static final boolean INTRADC_TCP_NODELAY = 
Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
+
+    /**
+     * Number of milliseconds between connection createRetry attempts.
+     */
+    private static final int OPEN_RETRY_DELAY_MS = 100;
+
+    /**
+     * A minimum number of milliseconds to wait for a connection (TCP socket 
connect + handshake)
+     */
+    private static final int MINIMUM_CONNECT_TIMEOUT_MS = 2000;
+    private final IInternodeAuthenticator authenticator;
+
+    /**
+     * Describes this instance's ability to send messages into it's Netty 
{@link Channel}.
+     */
+    enum State
+    {
+        /** waiting to create the connection */
+        NOT_READY,
+        /** we've started to create the connection/channel */
+        CREATING_CHANNEL,
+        /** channel is established and we can send messages */
+        READY,
+        /** a dead state which should not be transitioned away from */
+        CLOSED
+    }
+
+    /**
+     * Backlog to hold messages passed by upstream threads while the Netty 
{@link Channel} is being set up or recreated.
+     */
+    private final Queue<QueuedMessage> backlog;
+
+    /**
+     * Reference to a {@link ScheduledExecutorService} rther than directly 
depending on something like {@link ScheduledExecutors}.
+     */
+    private final ScheduledExecutorService scheduledExecutor;
+
+    final AtomicLong droppedMessageCount;
+    final AtomicLong completedMessageCount;
+
+    private volatile OutboundConnectionIdentifier connectionId;
+
+    private final ServerEncryptionOptions encryptionOptions;
+
+    /**
+     * A future for retrying connections. Bear in mind that this future does 
not execute in the
+     * netty event event loop, so there's some races to be careful of.
+     */
+    private volatile ScheduledFuture<?> connectionRetryFuture;
+
+    /**
+     * A future for notifying when the timeout for creating the connection and 
negotiating the handshake has elapsed.
+     * It will be cancelled when the channel is established correctly. Bear in 
mind that this future does not execute in the
+     * netty event event loop, so there's some races to be careful of.
+     */
+    private volatile ScheduledFuture<?> connectionTimeoutFuture;
+
+    private final AtomicReference<State> state;
+
+    private final Optional<CoalescingStrategy> coalescingStrategy;
+
+    /**
+     * A running count of the number of times we've tried to create a 
connection.
+     */
+    private volatile int connectAttemptCount;
+
+    /**
+     * The netty channel, once a socket connection is established; it won't be 
in it's normal working state until the handshake is complete.
+     */
+    private volatile ChannelWriter channelWriter;
+
+    /**
+     * the target protocol version to communicate to the peer with, 
discovered/negotiated via handshaking
+     */
+    private int targetVersion;
+
+    OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+                                ServerEncryptionOptions encryptionOptions,
+                                Optional<CoalescingStrategy> 
coalescingStrategy,
+                                IInternodeAuthenticator authenticator)
+    {
+        this(connectionId, encryptionOptions, coalescingStrategy, 
authenticator, ScheduledExecutors.scheduledFastTasks);
+    }
+
+    @VisibleForTesting
+    OutboundMessagingConnection(OutboundConnectionIdentifier connectionId,
+                                ServerEncryptionOptions encryptionOptions,
+                                Optional<CoalescingStrategy> 
coalescingStrategy,
+                                IInternodeAuthenticator authenticator,
+                                ScheduledExecutorService sceduledExecutor)
+    {
+        this.connectionId = connectionId;
+        this.encryptionOptions = encryptionOptions;
+        this.authenticator = authenticator;
+        backlog = new ConcurrentLinkedQueue<>();
+        droppedMessageCount = new AtomicLong(0);
+        completedMessageCount = new AtomicLong(0);
+        state = new AtomicReference<>(State.NOT_READY);
+        this.scheduledExecutor = sceduledExecutor;
+        this.coalescingStrategy = coalescingStrategy;
+
+        // We want to use the most precise protocol version we know because 
while there is version detection on connect(),
+        // the target version might be accessed by the pool (in 
getConnection()) before we actually connect (as we
+        // only connect when the first message is submitted). Note however 
that the only case where we'll connect
+        // without knowing the true version of a node is if that node is a 
seed (otherwise, we can't know a node
+        // unless it has been gossiped to us or it has connected to us, and in 
both cases that will set the version).
+        // In that case we won't rely on that targetVersion before we're 
actually connected and so the version
+        // detection in connect() will do its job.
+        targetVersion = 
MessagingService.instance().getVersion(connectionId.remote());
+    }
+
+    /**
+     * If the connection is set up and ready to use (the normal case), simply 
send the message to it and return.
+     * Otherwise, one lucky thread is selected to create the Channel, while 
other threads just add the {@code msg} to
+     * the backlog queue.
+     *
+     * @return true if the message was accepted by the {@link #channelWriter}; 
else false if it was not accepted
+     * and added to the backlog or the channel is {@link State#CLOSED}. See 
documentation in {@link ChannelWriter} and
+     * {@link MessageOutHandler} how the backlogged messages get consumed.
+     */
+    boolean sendMessage(MessageOut msg, int id)
+    {
+        return sendMessage(new QueuedMessage(msg, id));
+    }
+
+    boolean sendMessage(QueuedMessage queuedMessage)
+    {
+        State state = this.state.get();
+        if (state == State.READY)
+        {
+            if (channelWriter.write(queuedMessage, false))
+                return true;
+
+            backlog.add(queuedMessage);
+            return false;
+        }
+        else if (state == State.CLOSED)
+        {
+            errorLogger.warn("trying to write message to a closed connection");
+            return false;
+        }
+        else
+        {
+            backlog.add(queuedMessage);
+            connect();
+            return true;
+        }
+    }
+
+    /**
+     * Initiate all the actions required to establish a working, valid 
connection. This includes
+     * opening the socket, negotiating the internode messaging handshake, and 
setting up the working
+     * Netty {@link Channel}. However, this method will not block for all 
those actions: it will only
+     * kick off the connection attempt as everything is asynchronous.
+     * <p>
+     * Threads compete to update the {@link #state} field to {@link 
State#CREATING_CHANNEL} to ensure only one
+     * connection is attempted at a time.
+     *
+     * @return true if kicking off the connection attempt was started by this 
thread; else, false.
+     */
+    public boolean connect()
+    {
+        // try to be the winning thread to create the channel
+        if (!state.compareAndSet(State.NOT_READY, State.CREATING_CHANNEL))
+            return false;
+
+        // clean up any lingering connection attempts
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+
+        return tryConnect();
+    }
+
+    private boolean tryConnect()
+    {
+        if (state.get() != State.CREATING_CHANNEL)
+                return false;
+
+        logger.debug("connection attempt {} to {}", connectAttemptCount, 
connectionId);
+
+
+        InetSocketAddress remote = connectionId.remoteAddress();
+        if (!authenticator.authenticate(remote.getAddress(), remote.getPort()))
+        {
+            logger.warn("Internode auth failed connecting to {}", 
connectionId);
+            //Remove the connection pool and other thread so messages aren't 
queued
+            
MessagingService.instance().destroyConnectionPool(remote.getAddress());
+
+            // don't update the state field as destroyConnectionPool() 
*should* call OMC.close()
+            // on all the connections in the OMP for the remoteAddress
+            return false;
+        }
+
+        boolean compress = shouldCompressConnection(connectionId.local(), 
connectionId.remote());
+        Bootstrap bootstrap = buildBootstrap(compress);
+
+        ChannelFuture connectFuture = bootstrap.connect();
+        connectFuture.addListener(this::connectCallback);
+
+        long timeout = Math.max(MINIMUM_CONNECT_TIMEOUT_MS, 
DatabaseDescriptor.getRpcTimeout());
+        if (connectionTimeoutFuture == null || 
connectionTimeoutFuture.isDone())
+            connectionTimeoutFuture = scheduledExecutor.schedule(() -> 
connectionTimeout(connectFuture), timeout, TimeUnit.MILLISECONDS);
+        return true;
+    }
+
+    @VisibleForTesting
+    static boolean shouldCompressConnection(InetAddress localHost, InetAddress 
remoteHost)
+    {
+        return (DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.all)
+               || ((DatabaseDescriptor.internodeCompression() == 
Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost));
+    }
+
+    private Bootstrap buildBootstrap(boolean compress)
+    {
+        boolean tcpNoDelay = isLocalDC(connectionId.local(), 
connectionId.remote()) ? INTRADC_TCP_NODELAY : 
DatabaseDescriptor.getInterDCTcpNoDelay();
+        int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
+                             ? DatabaseDescriptor.getInternodeSendBufferSize()
+                             : 
OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+        OutboundConnectionParams params = OutboundConnectionParams.builder()
+                                                                  
.connectionId(connectionId)
+                                                                  
.callback(this::finishHandshake)
+                                                                  
.encryptionOptions(encryptionOptions)
+                                                                  
.mode(Mode.MESSAGING)
+                                                                  
.compress(compress)
+                                                                  
.coalescingStrategy(coalescingStrategy)
+                                                                  
.sendBufferSize(sendBufferSize)
+                                                                  
.tcpNoDelay(tcpNoDelay)
+                                                                  
.backlogSupplier(() -> nextBackloggedMessage())
+                                                                  
.messageResultConsumer(this::handleMessageResult)
+                                                                  
.protocolVersion(targetVersion)
+                                                                  .build();
+
+        return NettyFactory.instance.createOutboundBootstrap(params);
+    }
+
+    private QueuedMessage nextBackloggedMessage()
+    {
+        QueuedMessage msg = backlog.poll();
+        if (msg == null)
+            return null;
+
+        if (!msg.isTimedOut())
+            return msg;
+
+        if (msg.shouldRetry())
+            return msg.createRetry();
+
+        droppedMessageCount.incrementAndGet();
+        return null;
+    }
+
+    static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost)
+    {
+        String remoteDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost);
+        String localDC = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost);
+        return remoteDC != null && remoteDC.equals(localDC);
+    }
+
+    /**
+     * Handles the callback of the TCP connection attempt (not including the 
handshake negotiation!), and really all
+     * we're handling here is the TCP connection failures. On failure, we 
close the channel (which should disconnect
+     * the socket, if connected). If there was an {@link IOException} while 
trying to connect, the connection will be
+     * retried after a short delay.
+     * <p>
+     * This method does not alter the {@link #state} as it's only evaluating 
the TCP connect, not TCP connect and handshake.
+     * Thus, {@link #finishHandshake(HandshakeResult)} will handle any 
necessary state updates.
+     * <p>
+     * Note: this method is called from the event loop, so be careful wrt 
thread visibility
+     *
+     * @return true iff the TCP connection was established and the {@link 
#state} is not {@link State#CLOSED}; else false.
+     */
+    @VisibleForTesting
+    boolean connectCallback(Future<? super Void> future)
+    {
+        ChannelFuture channelFuture = (ChannelFuture)future;
+
+        // make sure this instance is not (terminally) closed
+        if (state.get() == State.CLOSED)
+        {
+            channelFuture.channel().close();
+            return false;
+        }
+
+        // this is the success state
+        final Throwable cause = future.cause();
+        if (cause == null)
+        {
+            connectAttemptCount = 0;
+            return true;
+        }
+
+        setStateIfNotClosed(state, State.NOT_READY);
+        if (cause instanceof IOException)
+        {
+            logger.trace("unable to connect on attempt {} to {}", 
connectAttemptCount, connectionId, cause);
+            connectAttemptCount++;
+            connectionRetryFuture = scheduledExecutor.schedule(this::connect, 
OPEN_RETRY_DELAY_MS * connectAttemptCount, TimeUnit.MILLISECONDS);
+        }
+        else
+        {
+            JVMStabilityInspector.inspectThrowable(cause);
+            logger.error("non-IO error attempting to connect to {}", 
connectionId, cause);
+        }
+        return false;
+    }
+
+    /**
+     * A callback for handling timeouts when creating a connection/negotiating 
the handshake.
+     * <p>
+     * Note: this method is *not* invoked from the netty event loop,
+     * so there's an inherent race with {@link 
#finishHandshake(HandshakeResult)},
+     * as well as any possible connect() reattempts (a seemingly remote race 
condition, however).
+     * Therefore, this function tries to lose any races, as much as possible.
+     *
+     * @return true if there was a timeout on the connect/handshake; else 
false.
+     */
+    boolean connectionTimeout(ChannelFuture channelFuture)
+    {
+        if (connectionRetryFuture != null)
+        {
+            connectionRetryFuture.cancel(false);
+            connectionRetryFuture = null;
+        }
+        connectAttemptCount = 0;
+        State initialState = state.get();
+        if (initialState == State.CLOSED)
+            return true;
+
+        if (initialState != State.READY)
+        {
+            logger.debug("timed out while trying to connect to {}", 
connectionId);
+
+            channelFuture.channel().close();
+            // a last-ditch attempt to let finishHandshake() win the race
+            if (state.compareAndSet(initialState, State.NOT_READY))
+            {
+                backlog.clear();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Process the results of the handshake negotiation.
+     * <p>
+     * Note: this method will be invoked from the netty event loop,
+     * so there's an inherent race with {@link 
#connectionTimeout(ChannelFuture)}.
+     */
+    void finishHandshake(HandshakeResult result)
+    {
+        // clean up the connector instances before changing the state
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+        if (connectionRetryFuture != null)
+        {
+            connectionRetryFuture.cancel(false);
+            connectionRetryFuture = null;
+        }
+        connectAttemptCount = 0;
+
+        if (result.negotiatedMessagingVersion != 
HandshakeResult.UNKNOWN_PROTOCOL_VERSION)
+        {
+            targetVersion = result.negotiatedMessagingVersion;
+            MessagingService.instance().setVersion(connectionId.remote(), 
targetVersion);
+        }
+
+        switch (result.outcome)
+        {
+            case SUCCESS:
+                assert result.channelWriter != null;
+                logger.debug("successfully connected to {}, conmpress = {}, 
coalescing = {}", connectionId,
+                             shouldCompressConnection(connectionId.local(), 
connectionId.remote()),
+                             coalescingStrategy.isPresent() ? 
coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
+                if (state.get() == State.CLOSED)
+                {
+                    result.channelWriter.close();
+                    backlog.clear();
+                    break;
+                }
+                channelWriter = result.channelWriter;
+                // drain the backlog to the channel
+                channelWriter.writeBacklog(backlog, true);
+                // change the state so newly incoming messages can be sent to 
the channel (without adding to the backlog)
+                setStateIfNotClosed(state, State.READY);
+                // ship out any stragglers that got added to the backlog
+                channelWriter.writeBacklog(backlog, true);
+                break;
+            case DISCONNECT:
+                reconnect();
+                break;
+            case NEGOTIATION_FAILURE:
+                setStateIfNotClosed(state, State.NOT_READY);
+                backlog.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("unhandled result type: " + 
result.outcome);
+        }
+    }
+
+    @VisibleForTesting
+    static boolean setStateIfNotClosed(AtomicReference<State> state, State 
newState)
+    {
+        State s = state.get();
+        if (s == State.CLOSED)
+            return false;
+        state.set(newState);
+        return true;
+    }
+
+    int getTargetVersion()
+    {
+        return targetVersion;
+    }
+
+    /**
+     * Handles the result of each message sent.
+     *
+     * Note: this function is expected to be invoked on the netty event loop. 
Also, do not retain any state from
+     * the input {@code messageResult}.
+     */
+    void handleMessageResult(MessageResult messageResult)
+    {
+        completedMessageCount.incrementAndGet();
+
+        // checking the cause() is an optimized way to tell if the operation 
was successful (as the cause will be null)
+        // Note that ExpiredException is just a marker for timeout-ed message 
we're dropping, but as we already
+        // incremented the dropped message count in MessageOutHandler, we have 
nothing to do.
+        Throwable cause = messageResult.future.cause();
+        if (cause == null)
+            return;
+
+        if (cause instanceof ExpiredException)
+        {
+            droppedMessageCount.incrementAndGet();
+            return;
+        }
+
+        JVMStabilityInspector.inspectThrowable(cause);
+
+        if (cause instanceof IOException || cause.getCause() instanceof 
IOException)
+        {
+            ChannelWriter writer = messageResult.writer;
+            if (writer.shouldPurgeBacklog())
+                purgeBacklog();
+
+            // This writer needs to be closed and we need to trigger a 
reconnection. We really only want to do that
+            // once for this channel however (and again, no race because we're 
on the netty event loop).
+            if (!writer.isClosed() && messageResult.allowReconnect)
+            {
+                reconnect();
+                writer.close();
+            }
+
+            QueuedMessage msg = messageResult.msg;
+            if (msg != null && msg.shouldRetry())
+            {
+                sendMessage(msg.createRetry());
+            }
+        }
+        else if (messageResult.future.isCancelled())
+        {
+            // Someone cancelled the future, which we assume meant it doesn't 
want the message to be sent if it hasn't
+            // yet. Just ignore.
+        }
+        else
+        {
+            // Non IO exceptions are likely a programming error so let's not 
silence them
+            logger.error("Unexpected error writing on " + connectionId, cause);
+        }
+    }
+
+    /**
+     * Change the IP address on which we connect to the peer. We will attempt 
to connect to the new address if there
+     * was a previous connection, and new incoming messages as well as 
existing {@link #backlog} messages will be sent there.
+     * Any outstanding messages in the existing channel will still be sent to 
the previous address (we won't/can't move them from
+     * one channel to another).
+     */
+    void reconnectWithNewIp(InetSocketAddress newAddr)
+    {
+        State currentState = state.get();
+
+        // if we're closed, ignore the request
+        if (currentState == State.CLOSED)
+            return;
+
+        // capture a reference to the current channel, in case it gets swapped 
out before we can call close() on it
+        ChannelWriter currentChannel = channelWriter;
+        connectionId = connectionId.withNewConnectionAddress(newAddr);
+
+        if (currentState != State.NOT_READY)
+            reconnect();
+
+        // lastly, push through anything remaining in the existing channel.
+        if (currentChannel != null)
+            currentChannel.softClose();
+    }
+
+    /**
+     * Sets the state properly so {@link #connect()} can attempt to reconnect.
+     */
+    void reconnect()
+    {
+        if (setStateIfNotClosed(state, State.NOT_READY))
+            connect();
+    }
+
+    void purgeBacklog()
+    {
+        backlog.clear();
+    }
+
+    public void close(boolean softClose)
+    {
+        state.set(State.CLOSED);
+
+        if (connectionTimeoutFuture != null)
+        {
+            connectionTimeoutFuture.cancel(false);
+            connectionTimeoutFuture = null;
+        }
+
+        // drain the backlog
+        if (channelWriter != null)
+        {
+            if (softClose)
+            {
+                channelWriter.writeBacklog(backlog, false);
+                channelWriter.softClose();
+            }
+            else
+            {
+                backlog.clear();
+                channelWriter.close();
+            }
+
+            channelWriter = null;
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return connectionId.toString();
+    }
+
+    public Integer getPendingMessages()
+    {
+        int pending = backlog.size();
+        ChannelWriter chan = channelWriter;
+        if (chan != null)
+            pending += (int)chan.pendingMessageCount();
+        return pending;
+    }
+
+    public Long getCompletedMessages()
+    {
+        return completedMessageCount.get();
+    }
+
+    public Long getDroppedMessages()
+    {
+        return droppedMessageCount.get();
+    }
+
+    /*
+        methods specific to testing follow
+     */
+
+    @VisibleForTesting
+    int backlogSize()
+    {
+        return backlog.size();
+    }
+
+    @VisibleForTesting
+    void addToBacklog(QueuedMessage msg)
+    {
+        backlog.add(msg);
+    }
+
+    @VisibleForTesting
+    void setChannelWriter(ChannelWriter channelWriter)
+    {
+        this.channelWriter = channelWriter;
+    }
+
+    @VisibleForTesting
+    ChannelWriter getChannelWriter()
+    {
+        return channelWriter;
+    }
+
+    @VisibleForTesting
+    void setState(State state)
+    {
+        this.state.set(state);
+    }
+
+    @VisibleForTesting
+    State getState()
+    {
+        return state.get();
+    }
+
+    @VisibleForTesting
+    void setTargetVersion(int targetVersion)
+    {
+        this.targetVersion = targetVersion;
+    }
+
+    @VisibleForTesting
+    OutboundConnectionIdentifier getConnectionId()
+    {
+        return connectionId;
+    }
+
+    @VisibleForTesting
+    void setConnectionTimeoutFuture(ScheduledFuture<?> connectionTimeoutFuture)
+    {
+        this.connectionTimeoutFuture = connectionTimeoutFuture;
+    }
+
+    @VisibleForTesting
+    ScheduledFuture<?> getConnectionTimeoutFuture()
+    {
+        return connectionTimeoutFuture;
+    }
+
+    public boolean isConnected()
+    {
+        return state.get() == State.READY;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java 
b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
new file mode 100644
index 0000000..0086da8
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.net.InetSocketAddress;
+import java.util.Optional;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.metrics.ConnectionMetrics;
+import org.apache.cassandra.net.BackPressureState;
+import org.apache.cassandra.net.MessageOut;
+import 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+
+/**
+ * Groups a set of outbound connections to a given peer, and routes outgoing 
messages to the appropriate connection
+ * (based upon message's type or size). Contains a {@link 
OutboundMessagingConnection} for each of the
+ * {@link ConnectionType} type.
+ */
+public class OutboundMessagingPool
+{
+    @VisibleForTesting
+    static final long LARGE_MESSAGE_THRESHOLD = 
Long.getLong(Config.PROPERTY_PREFIX + "otcp_large_message_threshold", 1024 * 
64);
+
+    private final ConnectionMetrics metrics;
+    private final BackPressureState backPressureState;
+
+    public OutboundMessagingConnection gossipChannel;
+    public OutboundMessagingConnection largeMessageChannel;
+    public OutboundMessagingConnection smallMessageChannel;
+
+    /**
+     * An override address on which to communicate with the peer. Typically 
used for something like EC2 public IP addresses
+     * which need to be used for communication between EC2 regions.
+     */
+    private InetSocketAddress preferredRemoteAddr;
+
+    public OutboundMessagingPool(InetSocketAddress remoteAddr, 
InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions,
+                                 BackPressureState backPressureState, 
IInternodeAuthenticator authenticator)
+    {
+        preferredRemoteAddr = remoteAddr;
+        this.backPressureState = backPressureState;
+        metrics = new ConnectionMetrics(localAddr.getAddress(), this);
+
+
+        smallMessageChannel = new 
OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, 
preferredRemoteAddr),
+                                                              
encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+        largeMessageChannel = new 
OutboundMessagingConnection(OutboundConnectionIdentifier.large(localAddr, 
preferredRemoteAddr),
+                                                              
encryptionOptions, coalescingStrategy(remoteAddr), authenticator);
+
+        // don't attempt coalesce the gossip messages, just ship them out asap 
(let's not anger the FD on any peer node by any artificial delays)
+        gossipChannel = new 
OutboundMessagingConnection(OutboundConnectionIdentifier.gossip(localAddr, 
preferredRemoteAddr),
+                                                        encryptionOptions, 
Optional.empty(), authenticator);
+    }
+
+    private static Optional<CoalescingStrategy> 
coalescingStrategy(InetSocketAddress remoteAddr)
+    {
+        String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy();
+        String displayName = remoteAddr.getAddress().getHostAddress();
+        return CoalescingStrategies.newCoalescingStrategy(strategyName,
+                                                          
DatabaseDescriptor.getOtcCoalescingWindow(),
+                                                          
OutboundMessagingConnection.logger,
+                                                          displayName);
+
+    }
+
+    public BackPressureState getBackPressureState()
+    {
+        return backPressureState;
+    }
+
+    public void sendMessage(MessageOut msg, int id)
+    {
+        getConnection(msg).sendMessage(msg, id);
+    }
+
+    @VisibleForTesting
+    public OutboundMessagingConnection getConnection(MessageOut msg)
+    {
+        // optimize for the common path (the small message channel)
+        if (Stage.GOSSIP != msg.getStage())
+        {
+            return msg.serializedSize(smallMessageChannel.getTargetVersion()) 
< LARGE_MESSAGE_THRESHOLD
+            ? smallMessageChannel
+            : largeMessageChannel;
+        }
+        return gossipChannel;
+    }
+
+    /**
+     * Reconnect to the peer using the given {@code addr}. Outstanding 
messages in each channel will be sent on the
+     * current channel. Typically this function is used for something like EC2 
public IP addresses which need to be used
+     * for communication between EC2 regions.
+     *
+     * @param addr IP Address to use (and prefer) going forward for connecting 
to the peer
+     */
+    public void reconnectWithNewIp(InetSocketAddress addr)
+    {
+        preferredRemoteAddr = addr;
+        gossipChannel.reconnectWithNewIp(addr);
+        largeMessageChannel.reconnectWithNewIp(addr);
+        smallMessageChannel.reconnectWithNewIp(addr);
+    }
+
+    /**
+     * Close each netty channel and it's socket.
+     *
+     * @param softClose {@code true} if existing messages in the queue should 
be sent before closing.
+     */
+    public void close(boolean softClose)
+    {
+        gossipChannel.close(softClose);
+        largeMessageChannel.close(softClose);
+        smallMessageChannel.close(softClose);
+    }
+
+    /**
+     * For testing purposes only.
+     */
+    @VisibleForTesting
+    OutboundMessagingConnection getConnection(ConnectionType connectionType)
+    {
+        switch (connectionType)
+        {
+            case GOSSIP:
+                return gossipChannel;
+            case LARGE_MESSAGE:
+                return largeMessageChannel;
+            case SMALL_MESSAGE:
+                return smallMessageChannel;
+            default:
+                throw new IllegalArgumentException("unsupported connection 
type: " + connectionType);
+        }
+    }
+
+    public void incrementTimeout()
+    {
+        metrics.timeouts.mark();
+    }
+
+    public long getTimeouts()
+    {
+        return metrics.timeouts.getCount();
+    }
+
+    public InetSocketAddress getPreferredRemoteAddr()
+    {
+        return preferredRemoteAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/QueuedMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/QueuedMessage.java 
b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
new file mode 100644
index 0000000..28e4ba4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/QueuedMessage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.CoalescingStrategies;
+
+/**
+ *  A wrapper for outbound messages. All messages will be retried once.
+ */
+public class QueuedMessage implements CoalescingStrategies.Coalescable
+{
+    public final MessageOut<?> message;
+    public final int id;
+    public final long timestampNanos;
+    public final boolean droppable;
+    private final boolean retryable;
+
+    public QueuedMessage(MessageOut<?> message, int id)
+    {
+        this(message, id, System.nanoTime(), 
MessagingService.DROPPABLE_VERBS.contains(message.verb), true);
+    }
+
+    @VisibleForTesting
+    public QueuedMessage(MessageOut<?> message, int id, long timestampNanos, 
boolean droppable, boolean retryable)
+    {
+        this.message = message;
+        this.id = id;
+        this.timestampNanos = timestampNanos;
+        this.droppable = droppable;
+        this.retryable = retryable;
+    }
+
+    /** don't drop a non-droppable message just because it's timestamp is 
expired */
+    public boolean isTimedOut()
+    {
+        return droppable && timestampNanos < System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+    }
+
+    public boolean shouldRetry()
+    {
+        return retryable;
+    }
+
+    public QueuedMessage createRetry()
+    {
+        return new QueuedMessage(message, id, System.nanoTime(), droppable, 
false);
+    }
+
+    public long timestampNanos()
+    {
+        return timestampNanos;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to