move streaming to use netty

patch by jasobrown, reviewed by aweisberg for CASSANDRA-12229


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc92db2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc92db2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc92db2b

Branch: refs/heads/trunk
Commit: fc92db2b9b56c143516026ba29cecdec37e286bb
Parents: 356dc3c
Author: Jason Brown <jasedbr...@gmail.com>
Authored: Mon Apr 11 05:26:18 2016 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Tue Aug 22 13:54:44 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 lib/compress-lzf-0.8.4.jar                      | Bin 25490 -> 0 bytes
 .../org/apache/cassandra/config/Config.java     |   6 -
 .../cassandra/config/DatabaseDescriptor.java    |  15 -
 .../exceptions/ChecksumMismatchException.java   |  34 ++
 .../io/compress/CompressionMetadata.java        |   4 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../io/util/DataIntegrityMetadata.java          |  26 +
 .../net/IncomingStreamingConnection.java        | 104 ----
 .../net/async/ByteBufDataInputPlus.java         |  12 +
 .../net/async/ByteBufDataOutputStreamPlus.java  | 191 +++++++
 .../net/async/InboundHandshakeHandler.java      |  36 +-
 .../cassandra/net/async/NettyFactory.java       |  24 +-
 .../net/async/OutboundConnectionIdentifier.java |  21 +-
 .../net/async/OutboundHandshakeHandler.java     |   9 +-
 .../async/RebufferingByteBufDataInputPlus.java  | 250 +++++++++
 .../apache/cassandra/security/SSLFactory.java   |  49 --
 .../cassandra/service/StorageService.java       |  11 -
 .../cassandra/service/StorageServiceMBean.java  |   3 -
 .../cassandra/streaming/ConnectionHandler.java  | 428 ----------------
 .../streaming/DefaultConnectionFactory.java     | 122 +++--
 .../streaming/StreamConnectionFactory.java      |  11 +-
 .../cassandra/streaming/StreamCoordinator.java  |  22 +-
 .../cassandra/streaming/StreamManager.java      |  24 +-
 .../apache/cassandra/streaming/StreamPlan.java  |   2 +-
 .../cassandra/streaming/StreamReader.java       |  25 +-
 .../streaming/StreamReceiveException.java       |  36 ++
 .../cassandra/streaming/StreamReceiveTask.java  |   1 +
 .../cassandra/streaming/StreamResultFuture.java |  32 +-
 .../cassandra/streaming/StreamSession.java      | 396 ++++++++-------
 .../cassandra/streaming/StreamTransferTask.java |  10 +-
 .../cassandra/streaming/StreamWriter.java       | 115 +++--
 .../streaming/StreamingMessageSender.java       |  34 ++
 .../async/NettyStreamingMessageSender.java      | 508 +++++++++++++++++++
 .../async/StreamCompressionSerializer.java      | 133 +++++
 .../async/StreamingInboundHandler.java          | 268 ++++++++++
 .../cassandra/streaming/async/package-info.java |  71 +++
 .../ByteBufCompressionDataOutputStreamPlus.java |  76 +++
 .../compress/CompressedInputStream.java         | 225 ++++----
 .../compress/CompressedStreamReader.java        |  17 +-
 .../compress/CompressedStreamWriter.java        |  25 +-
 .../compress/StreamCompressionInputStream.java  |  78 +++
 .../streaming/messages/CompleteMessage.java     |  10 +-
 .../streaming/messages/FileMessageHeader.java   |  38 +-
 .../streaming/messages/IncomingFileMessage.java |  30 +-
 .../streaming/messages/KeepAliveMessage.java    |   9 +-
 .../streaming/messages/OutgoingFileMessage.java |  28 +-
 .../streaming/messages/PrepareAckMessage.java   |  57 +++
 .../streaming/messages/PrepareMessage.java      |  93 ----
 .../messages/PrepareSynAckMessage.java          |  80 +++
 .../streaming/messages/PrepareSynMessage.java   |  98 ++++
 .../streaming/messages/ReceivedMessage.java     |  11 +-
 .../streaming/messages/RetryMessage.java        |  71 ---
 .../messages/SessionFailedMessage.java          |  10 +-
 .../streaming/messages/StreamInitMessage.java   |  73 +--
 .../streaming/messages/StreamMessage.java       |  58 +--
 .../tools/BulkLoadConnectionFactory.java        |  32 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   7 -
 .../cassandra/tools/nodetool/GetTimeout.java    |   2 +-
 .../org/apache/cassandra/utils/UUIDGen.java     |   8 +-
 .../cassandra/streaming/LongStreamingTest.java  |  34 +-
 .../cassandra/cql3/PreparedStatementsTest.java  |   2 +-
 .../util/RewindableDataInputStreamPlusTest.java |   2 +-
 .../net/async/HandshakeHandlersTest.java        |   4 +-
 .../net/async/InboundHandshakeHandlerTest.java  |   8 +-
 .../async/OutboundMessagingConnectionTest.java  |  45 --
 .../RebufferingByteBufDataInputPlusTest.java    | 126 +++++
 .../net/async/TestScheduledFuture.java          |  66 +++
 .../apache/cassandra/service/RemoveTest.java    |   3 +-
 .../streaming/StreamTransferTaskTest.java       |   7 +-
 .../streaming/StreamingTransferTest.java        |  28 -
 .../async/NettyStreamingMessageSenderTest.java  | 202 ++++++++
 .../async/StreamCompressionSerializerTest.java  | 135 +++++
 .../async/StreamingInboundHandlerTest.java      | 168 ++++++
 .../compression/CompressedInputStreamTest.java  |  10 +-
 75 files changed, 3508 insertions(+), 1504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2e643e..a14e390 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * use netty for streaming (CASSANDRA-12229)
  * Use netty for internode messaging (CASSANDRA-8457)
  * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
  * Don't delete incremental repair sessions if they still have sstables 
(CASSANDRA-13758)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/lib/compress-lzf-0.8.4.jar
----------------------------------------------------------------------
diff --git a/lib/compress-lzf-0.8.4.jar b/lib/compress-lzf-0.8.4.jar
deleted file mode 100644
index a712c24..0000000
Binary files a/lib/compress-lzf-0.8.4.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 77d5bf4..537cf39 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -97,12 +97,6 @@ public class Config
 
     public volatile long truncate_request_timeout_in_ms = 60000L;
 
-    /**
-     * @deprecated use {@link #streaming_keep_alive_period_in_secs} instead
-     */
-    @Deprecated
-    public int streaming_socket_timeout_in_ms = 86400000; //24 hours
-
     public Integer streaming_connections_per_host = 1;
     public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 53bac93..302a528 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2060,21 +2060,6 @@ public class DatabaseDescriptor
         conf.counter_cache_keys_to_save = counterCacheKeysToSave;
     }
 
-    public static void setStreamingSocketTimeout(int value)
-    {
-        conf.streaming_socket_timeout_in_ms = value;
-    }
-
-    /**
-     * @deprecated use {@link #getStreamingKeepAlivePeriod()} instead
-     * @return streaming_socket_timeout_in_ms property
-     */
-    @Deprecated
-    public static int getStreamingSocketTimeout()
-    {
-        return conf.streaming_socket_timeout_in_ms;
-    }
-
     public static int getStreamingKeepAlivePeriod()
     {
         return conf.streaming_keep_alive_period_in_secs;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java 
b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java
new file mode 100644
index 0000000..a76c46c
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.exceptions;
+
+import java.io.IOException;
+
+public class ChecksumMismatchException extends IOException
+{
+    public ChecksumMismatchException()
+    {
+        super();
+    }
+
+    public ChecksumMismatchException(String s)
+    {
+        super(s);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java 
b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 6c1849f..8ac6589 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -145,7 +145,9 @@ public class CompressionMetadata
         this.chunkOffsetsSize = chunkOffsets.size();
     }
 
-    private CompressionMetadata(String filePath, CompressionParams parameters, 
SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength)
+    // do not call this constructor directly, unless used in testing
+    @VisibleForTesting
+    public CompressionMetadata(String filePath, CompressionParams parameters, 
Memory offsets, long offsetsSize, long dataLength, long compressedLength)
     {
         this.indexFilePath = filePath;
         this.parameters = parameters;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index ff47bec..dc56520 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -208,7 +208,7 @@ public class SSTableLoader implements StreamEventHandler
         for (SSTableReader sstable : sstables)
         {
             sstable.selfRef().release();
-            assert sstable.selfRef().globalCount() == 0;
+            assert sstable.selfRef().globalCount() == 0 : String.format("for 
sstable = %s, ref count = %d", sstable, sstable.selfRef().globalCount());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java 
b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
index 91b189d..277b359 100644
--- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
+++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.io.util;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ChecksumType;
@@ -57,6 +60,15 @@ public class DataIntegrityMetadata
             chunkSize = reader.readInt();
         }
 
+        @VisibleForTesting
+        protected ChecksumValidator(ChecksumType checksumType, 
RandomAccessReader reader, int chunkSize)
+        {
+            this.checksumType = checksumType;
+            this.reader = reader;
+            this.dataFilename = null;
+            this.chunkSize = chunkSize;
+        }
+
         public void seek(long offset)
         {
             long start = chunkStart(offset);
@@ -77,6 +89,20 @@ public class DataIntegrityMetadata
                 throw new IOException("Corrupted File : " + dataFilename);
         }
 
+        /**
+         * validates the checksum with the bytes from the specified buffer.
+         *
+         * Upon return, the buffer's position will
+         * be updated to its limit; its limit will not have been changed.
+         */
+        public void validate(ByteBuffer buffer) throws IOException
+        {
+            int current = (int) checksumType.of(buffer);
+            int actual = reader.readInt();
+            if (current != actual)
+                throw new IOException("Corrupted File : " + dataFilename);
+        }
+
         public void close()
         {
             reader.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
deleted file mode 100644
index 8db5fcb..0000000
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.net;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.streaming.StreamResultFuture;
-import org.apache.cassandra.streaming.messages.StreamInitMessage;
-import org.apache.cassandra.streaming.messages.StreamMessage;
-
-/**
- * Thread to consume stream init messages.
- */
-public class IncomingStreamingConnection extends Thread implements Closeable
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(IncomingStreamingConnection.class);
-
-    private final int version;
-    public final Socket socket;
-    private final Set<Closeable> group;
-
-    public IncomingStreamingConnection(int version, Socket socket, 
Set<Closeable> group)
-    {
-        super("STREAM-INIT-" + socket.getRemoteSocketAddress());
-        this.version = version;
-        this.socket = socket;
-        this.group = group;
-    }
-
-    @Override
-    @SuppressWarnings("resource") // Not closing constructed DataInputPlus's 
as the stream needs to remain open.
-    public void run()
-    {
-        try
-        {
-            // streaming connections are per-session and have a fixed version.
-            // we can't do anything with a wrong-version stream connection, so 
drop it.
-            if (version != StreamMessage.CURRENT_VERSION)
-                throw new IOException(String.format("Received stream using 
protocol version %d (my version %d). Terminating connection", version, 
StreamMessage.CURRENT_VERSION));
-
-            DataInputPlus input = new 
DataInputStreamPlus(socket.getInputStream());
-            StreamInitMessage init = 
StreamInitMessage.serializer.deserialize(input, version);
-
-            //Set SO_TIMEOUT on follower side
-            if (!init.isForOutgoing)
-                
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-
-            // The initiator makes two connections, one for incoming and one 
for outgoing.
-            // The receiving side distinguish two connections by looking at 
StreamInitMessage#isForOutgoing.
-            // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
-            // parallelize said streams and the socket is blocking, so we 
might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, this, init.isForOutgoing, 
version, init.keepSSTableLevel, init.pendingRepair, init.previewKind);
-        }
-        catch (Throwable t)
-        {
-            logger.error("Error while reading from socket from {}.", 
socket.getRemoteSocketAddress(), t);
-            close();
-        }
-    }
-
-    @Override
-    public void close()
-    {
-        try
-        {
-            if (!socket.isClosed())
-            {
-                socket.close();
-            }
-        }
-        catch (IOException e)
-        {
-            logger.debug("Error closing socket", e);
-        }
-        finally
-        {
-            group.remove(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
index f9fa07a..23e532c 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java
@@ -24,8 +24,20 @@ import org.apache.cassandra.io.util.DataInputPlus;
 
 public class ByteBufDataInputPlus extends ByteBufInputStream implements 
DataInputPlus
 {
+    /**
+     * The parent class does not expose the buffer to derived classes, so we 
need
+     * to stash a reference here so it can be exposed via {@link #buffer()}.
+     */
+    private final ByteBuf buf;
+
     public ByteBufDataInputPlus(ByteBuf buffer)
     {
         super(buffer);
+        this.buf = buffer;
+    }
+
+    public ByteBuf buffer()
+    {
+        return buf;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
new file mode 100644
index 0000000..3a544e4
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.StreamSession;
+
+/**
+ * A {@link DataOutputStreamPlus} that writes to a {@link ByteBuf}. The 
novelty here is that all writes
+ * actually get written in to a {@link ByteBuffer} that shares a backing 
buffer with a {@link ByteBuf}.
+ * The trick to do that is allocate the ByteBuf, get a ByteBuffer from it by 
calling {@link ByteBuf#nioBuffer()},
+ * and passing that to the super class as {@link #buffer}. When the {@link 
#buffer} is full or {@link #doFlush(int)}
+ * is invoked, the {@link #currentBuf} is published to the netty channel.
+ */
+public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus
+{
+    private final StreamSession session;
+    private final Channel channel;
+    private final int bufferSize;
+
+    /**
+     * Tracks how many bytes we've written to the netty channel. This more or 
less follows the channel's
+     * high/low water marks and ultimately the 'writablility' status of the 
channel. Unfortunately there's
+     * no notification mechanism that can poke a producer to let it know when 
the channel becomes writable
+     * (after it was unwritable); hence, the use of a {@link Semaphore}.
+     */
+    private final Semaphore channelRateLimiter;
+
+    /**
+     * This *must* be the owning {@link ByteBuf} for the {@link 
BufferedDataOutputStreamPlus#buffer}
+     */
+    private ByteBuf currentBuf;
+
+    private ByteBufDataOutputStreamPlus(StreamSession session, Channel 
channel, ByteBuf buffer, int bufferSize)
+    {
+        super(buffer.nioBuffer(0, bufferSize));
+        this.session = session;
+        this.channel = channel;
+        this.currentBuf = buffer;
+        this.bufferSize = bufferSize;
+
+        channelRateLimiter = new 
Semaphore(channel.config().getWriteBufferHighWaterMark(), true);
+    }
+
+    @Override
+    protected WritableByteChannel newDefaultChannel()
+    {
+        return new WritableByteChannel()
+        {
+            @Override
+            public int write(ByteBuffer src) throws IOException
+            {
+                assert src == buffer;
+                int size = src.position();
+                doFlush(size);
+                return size;
+            }
+
+            @Override
+            public boolean isOpen()
+            {
+                return channel.isOpen();
+            }
+
+            @Override
+            public void close()
+            {   }
+        };
+    }
+
+    public static ByteBufDataOutputStreamPlus create(StreamSession session, 
Channel channel, int bufferSize)
+    {
+        ByteBuf buf = channel.alloc().directBuffer(bufferSize, bufferSize);
+        return new ByteBufDataOutputStreamPlus(session, channel, buf, 
bufferSize);
+    }
+
+    /**
+     * Writes the incoming buffer directly to the backing {@link #channel}, 
without copying to the intermediate {@link #buffer}.
+     */
+    public ChannelFuture writeToChannel(ByteBuf buf) throws IOException
+    {
+        doFlush(buffer.position());
+
+        int byteCount = buf.readableBytes();
+        if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, 
byteCount, 5, TimeUnit.MINUTES))
+            throw new IOException("outbound channel was not writable");
+
+        // the (possibly naive) assumption that we should always flush after 
each incoming buf
+        ChannelFuture channelFuture = channel.writeAndFlush(buf);
+        channelFuture.addListener(future -> handleBuffer(future, byteCount));
+        return channelFuture;
+    }
+
+    /**
+     * Writes the incoming buffer directly to the backing {@link #channel}, 
without copying to the intermediate {@link #buffer}.
+     * The incoming buffer will be automatically released when the netty 
channel invokes the listeners of success/failure to
+     * send the buffer.
+     */
+    public ChannelFuture writeToChannel(ByteBuffer buffer) throws IOException
+    {
+        ChannelFuture channelFuture = 
writeToChannel(Unpooled.wrappedBuffer(buffer));
+        channelFuture.addListener(future -> FileUtils.clean(buffer));
+        return channelFuture;
+    }
+
+    @Override
+    protected void doFlush(int count) throws IOException
+    {
+        // flush the current backing write buffer only if there's any pending 
data
+        if (buffer.position() > 0 && channel.isOpen())
+        {
+            int byteCount = buffer.position();
+            currentBuf.writerIndex(byteCount);
+
+            if 
(!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, 
TimeUnit.MINUTES))
+                throw new IOException("outbound channel was not writable");
+
+            channel.writeAndFlush(currentBuf).addListener(future -> 
handleBuffer(future, byteCount));
+            currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize);
+            buffer = currentBuf.nioBuffer(0, bufferSize);
+        }
+    }
+
+    /**
+     * Handles the result of publishing a buffer to the channel.
+     *
+     * Note: this will be executed on the event loop.
+     */
+    private void handleBuffer(Future<? super Void> future, int bytesWritten)
+    {
+        channelRateLimiter.release(bytesWritten);
+
+        if (!future.isSuccess() && channel.isOpen())
+            session.onError(future.cause());
+    }
+
+    public ByteBufAllocator getAllocator()
+    {
+        return channel.alloc();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Flush any last buffered (if the channel is open), and release any 
buffers. *Not* responsible for closing
+     * the netty channel as we might use it again for transferring more files.
+     *
+     * Note: should be called on the producer thread, not the netty event loop.
+     */
+    @Override
+    public void close() throws IOException
+    {
+        doFlush(0);
+        if (currentBuf.refCnt() > 0)
+            currentBuf.release();
+        currentBuf = null;
+        buffer = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java 
b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
index 5ea03dc..7a8303c 100644
--- a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
+++ b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java
@@ -14,6 +14,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
@@ -25,6 +26,8 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
 import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage;
 import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage;
+import org.apache.cassandra.streaming.async.StreamingInboundHandler;
+import org.apache.cassandra.streaming.messages.StreamMessage;
 
 /**
  * 'Server'-side component that negotiates the internode handshake when 
establishing a new connection.
@@ -36,13 +39,13 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
 {
     private static final Logger logger = 
LoggerFactory.getLogger(NettyFactory.class);
 
-    enum State { START, AWAITING_HANDSHAKE_BEGIN, AWAIT_STREAM_START_RESPONSE, 
AWAIT_MESSAGING_START_RESPONSE, MESSAGING_HANDSHAKE_COMPLETE, HANDSHAKE_FAIL }
+    enum State { START, AWAITING_HANDSHAKE_BEGIN, 
AWAIT_MESSAGING_START_RESPONSE, HANDSHAKE_COMPLETE, HANDSHAKE_FAIL }
 
     private State state;
 
     private final IInternodeAuthenticator authenticator;
-    private boolean hasAuthenticated;
 
+    private boolean hasAuthenticated;
     /**
      * The peer's declared messaging version.
      */
@@ -160,9 +163,16 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
 
         if (msg.mode == NettyFactory.Mode.STREAMING)
         {
-            // TODO fill in once streaming is moved to netty
-            ctx.close();
-            return State.AWAIT_STREAM_START_RESPONSE;
+            // streaming connections are per-session and have a fixed version. 
 we can't do anything with a wrong-version stream connection, so drop it.
+            if (version != StreamMessage.CURRENT_VERSION)
+            {
+                logger.warn("Received stream using protocol version %d (my 
version %d). Terminating connection", version, 
MessagingService.current_version);
+                ctx.close();
+                return State.HANDSHAKE_FAIL;
+            }
+
+            setupStreamingPipeline(ctx, version);
+            return State.HANDSHAKE_COMPLETE;
         }
         else
         {
@@ -195,6 +205,18 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
         }
     }
 
+    private void setupStreamingPipeline(ChannelHandlerContext ctx, int 
protocolVersion)
+    {
+        ChannelPipeline pipeline = ctx.pipeline();
+        InetSocketAddress address = (InetSocketAddress) 
ctx.channel().remoteAddress();
+        pipeline.addLast(NettyFactory.instance.streamingGroup, 
"streamInbound", new StreamingInboundHandler(address, protocolVersion, null));
+        pipeline.remove(this);
+
+        // pass a custom recv ByteBuf allocator to the channel. the default 
recv ByteBuf size is 1k, but in streaming we're
+        // dealing with large bulk blocks of data, let's default to larger 
sizes
+        ctx.channel().config().setRecvByteBufAllocator(new 
AdaptiveRecvByteBufAllocator(1 << 8, 1 << 13, 1 << 16));
+    }
+
     /**
      * Handles the third (and last) message in the internode messaging 
handshake protocol. Grabs the protocol version and
      * IP addr the peer wants to use.
@@ -227,7 +249,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
         logger.trace("Set version for {} to {} (will use {})", from, 
maxVersion, MessagingService.instance().getVersion(from));
 
         setupMessagingPipeline(ctx.pipeline(), from, compressed, version);
-        return State.MESSAGING_HANDSHAKE_COMPLETE;
+        return State.HANDSHAKE_COMPLETE;
     }
 
     @VisibleForTesting
@@ -245,7 +267,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder
     {
         // we're not really racing on the handshakeTimeout as we're in the 
event loop,
         // but, hey, defensive programming is beautiful thing!
-        if (state == State.MESSAGING_HANDSHAKE_COMPLETE || (handshakeTimeout 
!= null && handshakeTimeout.isCancelled()))
+        if (state == State.HANDSHAKE_COMPLETE || (handshakeTimeout != null && 
handshakeTimeout.isCancelled()))
             return;
 
         state = State.HANDSHAKE_FAIL;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/NettyFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java 
b/src/java/org/apache/cassandra/net/async/NettyFactory.java
index 13d8810..762c39b 100644
--- a/src/java/org/apache/cassandra/net/async/NettyFactory.java
+++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java
@@ -40,6 +40,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.concurrent.EventExecutor;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import io.netty.util.internal.logging.Slf4JLoggerFactory;
+
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 import org.apache.cassandra.auth.IInternodeAuthenticator;
@@ -69,12 +70,18 @@ public final class NettyFactory
 
     private static final int LZ4_HASH_SEED = 0x9747b28c;
 
+    /**
+     * Default seed value for xxhash.
+     */
+    public static final int XXHASH_DEFAULT_SEED = 0x9747b28c;
+
     public enum Mode { MESSAGING, STREAMING }
 
     private static final String SSL_CHANNEL_HANDLER_NAME = "ssl";
-    static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor";
-    static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = 
"outboundCompressor";
-    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    public static final String INBOUND_COMPRESSOR_HANDLER_NAME = 
"inboundCompressor";
+    public static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = 
"outboundCompressor";
+    public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
+    public static final String INBOUND_STREAM_HANDLER_NAME = 
"inboundStreamHandler";
 
     /** a useful addition for debugging; simply set to true to get more data 
in your logs */
     private static final boolean WIRETRACE = false;
@@ -113,6 +120,7 @@ public final class NettyFactory
 
     private final EventLoopGroup inboundGroup;
     private final EventLoopGroup outboundGroup;
+    public final EventLoopGroup streamingGroup;
 
     /**
      * Constructor that allows modifying the {@link NettyFactory#useEpoll} for 
testing purposes. Otherwise, use the
@@ -126,6 +134,7 @@ public final class NettyFactory
                                         
"MessagingService-NettyAcceptor-Threads", false);
         inboundGroup = getEventLoopGroup(useEpoll, 
FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", 
false);
         outboundGroup = getEventLoopGroup(useEpoll, 
FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", 
true);
+        streamingGroup = getEventLoopGroup(useEpoll, 
FBUtilities.getAvailableProcessors(), "Streaming-Netty-Threads", false);
     }
 
     /**
@@ -257,7 +266,8 @@ public final class NettyFactory
                 SslContext sslContext = 
SSLFactory.getSslContext(encryptionOptions, true, true);
                 SslHandler sslHandler = sslContext.newHandler(channel.alloc());
                 logger.trace("creating inbound netty SslContext: context={}, 
engine={}", sslContext.getClass().getName(), 
sslHandler.engine().getClass().getName());
-                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);       
     }
+                pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler);
+            }
 
             if (WIRETRACE)
                 pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));
@@ -279,13 +289,14 @@ public final class NettyFactory
      * Create the {@link Bootstrap} for connecting to a remote peer. This 
method does <b>not</b> attempt to connect to the peer,
      * and thus does not block.
      */
+    @VisibleForTesting
     public Bootstrap createOutboundBootstrap(OutboundConnectionParams params)
     {
         logger.debug("creating outbound bootstrap to peer {}, compression: {}, 
encryption: {}, coalesce: {}", params.connectionId.connectionAddress(),
                      params.compress, 
encryptionLogStatement(params.encryptionOptions),
                      params.coalescingStrategy.isPresent() ? 
params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED);
-        Class<? extends Channel>  transport = useEpoll ? 
EpollSocketChannel.class : NioSocketChannel.class;
-        Bootstrap bootstrap = new Bootstrap().group(outboundGroup)
+        Class<? extends Channel> transport = useEpoll ? 
EpollSocketChannel.class : NioSocketChannel.class;
+        Bootstrap bootstrap = new Bootstrap().group(params.mode == 
Mode.MESSAGING ? outboundGroup : streamingGroup)
                               .channel(transport)
                               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
2000)
                               .option(ChannelOption.SO_KEEPALIVE, true)
@@ -349,6 +360,7 @@ public final class NettyFactory
         acceptGroup.shutdownGracefully();
         outboundGroup.shutdownGracefully();
         inboundGroup.shutdownGracefully();
+        streamingGroup.shutdownGracefully();
     }
 
     static Lz4FrameEncoder createLz4Encoder(int protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java 
b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
index 24dc5ff..c834bd4 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java
@@ -32,7 +32,7 @@ public class OutboundConnectionIdentifier
 {
     enum ConnectionType
     {
-        GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE
+        GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE, STREAM
     }
 
     /**
@@ -99,6 +99,15 @@ public class OutboundConnectionIdentifier
     }
 
     /**
+     * Creates an identifier for a gossip connection and using the remote 
"identifying" address as its connection
+     * address.
+     */
+    public static OutboundConnectionIdentifier stream(InetSocketAddress 
localAddr, InetSocketAddress remoteAddr)
+    {
+        return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
ConnectionType.STREAM);
+    }
+
+    /**
      * Returns a newly created connection identifier to the same remote that 
this identifier, but using the provided
      * address as connection address.
      *
@@ -106,7 +115,7 @@ public class OutboundConnectionIdentifier
      * @return a newly created connection identifier that differs from this 
one only by using {@code remoteConnectionAddr}
      * as connection address to the remote.
      */
-    OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress 
remoteConnectionAddr)
+    public OutboundConnectionIdentifier 
withNewConnectionAddress(InetSocketAddress remoteConnectionAddr)
     {
         return new OutboundConnectionIdentifier(localAddr, remoteAddr, 
remoteConnectionAddr, connectionType);
     }
@@ -114,7 +123,7 @@ public class OutboundConnectionIdentifier
     /**
      * The local node address.
      */
-    InetAddress local()
+    public InetAddress local()
     {
         return localAddr.getAddress();
     }
@@ -122,7 +131,7 @@ public class OutboundConnectionIdentifier
     /**
      * The remote node identifying address (the one to use for anything else 
than connecting to the node).
      */
-    InetSocketAddress remoteAddress()
+    public InetSocketAddress remoteAddress()
     {
         return remoteAddr;
     }
@@ -130,7 +139,7 @@ public class OutboundConnectionIdentifier
     /**
      * The remote node identifying address (the one to use for anything else 
than connecting to the node).
      */
-    InetAddress remote()
+    public  InetAddress remote()
     {
         return remoteAddr.getAddress();
     }
@@ -138,7 +147,7 @@ public class OutboundConnectionIdentifier
     /**
      * The remote node connection address (the one to use to actually connect 
to the remote, and only that).
      */
-    InetSocketAddress connectionAddress()
+    public InetSocketAddress connectionAddress()
     {
         return remoteConnectionAddr;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java 
b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
index 703549a..c555bed 100644
--- a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
+++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java
@@ -36,6 +36,7 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage;
@@ -95,7 +96,9 @@ public class OutboundHandshakeHandler extends 
ByteToMessageDecoder
     /**
      * {@inheritDoc}
      *
-     * Invoked when the channel is made active, and sends out the {@link 
FirstHandshakeMessage}
+     * Invoked when the channel is made active, and sends out the {@link 
FirstHandshakeMessage}.
+     * In the case of streaming, we do not require a full bi-directional 
handshake; the initial message,
+     * containing the streaming protocol version, is all that is required.
      */
     @Override
     public void channelActive(final ChannelHandlerContext ctx) throws Exception
@@ -103,6 +106,10 @@ public class OutboundHandshakeHandler extends 
ByteToMessageDecoder
         FirstHandshakeMessage msg = new 
FirstHandshakeMessage(messagingVersion, mode, params.compress);
         logger.trace("starting handshake with peer {}, msg = {}", 
connectionId.connectionAddress(), msg);
         ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> 
firstHandshakeMessageListener(future, ctx));
+
+        if (mode == NettyFactory.Mode.STREAMING)
+            ctx.pipeline().remove(this);
+
         ctx.fireChannelActive();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java 
b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
new file mode 100644
index 0000000..580bc03
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelConfig;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.io.util.RebufferingInputStream;
+
+public class RebufferingByteBufDataInputPlus extends RebufferingInputStream 
implements ReadableByteChannel
+{
+    /**
+     * The parent, or owning, buffer of the current buffer being read from 
({@link super#buffer}).
+     */
+    private ByteBuf currentBuf;
+
+    private final BlockingQueue<ByteBuf> queue;
+
+    /**
+     * The count of live bytes in all {@link ByteBuf}s held by this instance.
+     */
+    private final AtomicInteger queuedByteCount;
+
+    private final int lowWaterMark;
+    private final int highWaterMark;
+    private final ChannelConfig channelConfig;
+
+    private volatile boolean closed;
+
+    public RebufferingByteBufDataInputPlus(int lowWaterMark, int 
highWaterMark, ChannelConfig channelConfig)
+    {
+        super(Unpooled.EMPTY_BUFFER.nioBuffer());
+
+        if (lowWaterMark > highWaterMark)
+            throw new IllegalArgumentException(String.format("low water mark 
is greater than high water mark: %d vs %d", lowWaterMark, highWaterMark));
+
+        currentBuf = Unpooled.EMPTY_BUFFER;
+        this.lowWaterMark = lowWaterMark;
+        this.highWaterMark = highWaterMark;
+        this.channelConfig = channelConfig;
+        queue = new LinkedBlockingQueue<>();
+        queuedByteCount = new AtomicInteger();
+    }
+
+    /**
+     * Append a {@link ByteBuf} to the end of the einternal queue.
+     *
+     * Note: it's expected this method is invoked on the netty event loop.
+     */
+    public void append(ByteBuf buf) throws IllegalStateException
+    {
+        assert buf != null : "buffer cannot be null";
+
+        if (closed)
+        {
+            ReferenceCountUtil.release(buf);
+            throw new IllegalStateException("stream is already closed, so 
cannot add another buffer");
+        }
+
+        // this slightly undercounts the live count as it doesn't include the 
currentBuf's size.
+        // that's ok as the worst we'll do is allow another buffer in and add 
it to the queue,
+        // and that point we'll disable auto-read. this is a tradeoff versus 
making some other member field
+        // atomic or volatile.
+        int queuedCount = queuedByteCount.addAndGet(buf.readableBytes());
+        if (channelConfig.isAutoRead() && queuedCount > highWaterMark)
+            channelConfig.setAutoRead(false);
+
+        queue.add(buf);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Release open buffers and poll the {@link #queue} for more data.
+     * <p>
+     * This is best, and more or less expected, to be invoked on a consuming 
thread (not the event loop)
+     * becasue if we block on the queue we can't fill it on the event loop (as 
that's where the buffers are coming from).
+     */
+    @Override
+    protected void reBuffer() throws IOException
+    {
+        currentBuf.release();
+        buffer = null;
+        currentBuf = null;
+
+        // possibly re-enable auto-read, *before* blocking on the queue, 
because if we block on the queue
+        // without enabling auto-read we'll block forever :(
+        if (!channelConfig.isAutoRead() && queuedByteCount.get() < 
lowWaterMark)
+            channelConfig.setAutoRead(true);
+
+        try
+        {
+            currentBuf = queue.take();
+            int bytes;
+            // if we get an explicitly empty buffer, we treat that as an 
indicator that the input is closed
+            if (currentBuf == null || (bytes = currentBuf.readableBytes()) == 
0)
+            {
+                releaseResources();
+                throw new EOFException();
+            }
+
+            buffer = currentBuf.nioBuffer(currentBuf.readerIndex(), bytes);
+            assert buffer.remaining() == bytes;
+            queuedByteCount.addAndGet(-bytes);
+            return;
+        }
+        catch (InterruptedException ie)
+        {
+            // nop - ignore
+        }
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException
+    {
+        int readLength = dst.remaining();
+        int remaining = readLength;
+
+        while (remaining > 0)
+        {
+            if (closed)
+                throw new EOFException();
+
+            if (!buffer.hasRemaining())
+                reBuffer();
+            int copyLength = Math.min(remaining, buffer.remaining());
+
+            int originalLimit = buffer.limit();
+            buffer.limit(buffer.position() + copyLength);
+            dst.put(buffer);
+            buffer.limit(originalLimit);
+            remaining -= copyLength;
+        }
+
+        return readLength;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * As long as this method is invoked on the consuming thread the returned 
value will be accurate.
+     */
+    @Override
+    public int available() throws EOFException
+    {
+        if (closed)
+            throw new EOFException();
+
+       final  int availableBytes = queuedByteCount.get() + (buffer != null ? 
buffer.remaining() : 0);
+
+        if (!channelConfig.isAutoRead() && availableBytes < lowWaterMark)
+            channelConfig.setAutoRead(true);
+
+        return availableBytes;
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+        return !closed;
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Note: This should invoked on the consuming thread.
+     */
+    @Override
+    public void close()
+    {
+        closed = true;
+        releaseResources();
+    }
+
+    private void releaseResources()
+    {
+        if (currentBuf != null)
+        {
+            if (currentBuf.refCnt() > 0)
+                currentBuf.release(currentBuf.refCnt());
+            currentBuf = null;
+            buffer = null;
+        }
+
+        ByteBuf buf;
+        while ((buf = queue.poll()) != null && buf.refCnt() > 0)
+            buf.release(buf.refCnt());
+    }
+
+    /**
+     * Mark this stream as closed, but do not release any of the resources.
+     *
+     * Note: this is best to be called from the producer thread.
+     */
+    public void markClose()
+    {
+        if (!closed)
+        {
+            closed = true;
+            queue.add(Unpooled.EMPTY_BUFFER);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * Note: this is best to be called from the consumer thread.
+     */
+    @Override
+    public String toString()
+    {
+        return new StringBuilder(128).append("RebufferingByteBufDataInputPlus: 
currentBuf = ").append(currentBuf)
+                                  .append(" (super.buffer = 
").append(buffer).append(')')
+                                  .append(", queuedByteCount = 
").append(queuedByteCount)
+                                  .append(", queue buffers = ").append(queue)
+                                  .append(", closed = ").append(closed)
+                                  .toString();
+    }
+
+    public ByteBufAllocator getAllocator()
+    {
+        return channelConfig.getAllocator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/security/SSLFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java 
b/src/java/org/apache/cassandra/security/SSLFactory.java
index 3c1293f..a931f5f 100644
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.security;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.InetAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyStore;
@@ -32,7 +31,6 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLParameters;
 import javax.net.ssl.SSLSocket;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
@@ -79,53 +77,6 @@ public final class SSLFactory
      */
     private static final AtomicReference<SslContext> serverSslContext = new 
AtomicReference<>();
 
-    /** Create a socket and connect */
-    public static SSLSocket getSocket(EncryptionOptions options, InetAddress 
address, int port, InetAddress localAddress, int localPort) throws IOException
-    {
-        SSLContext ctx = createSSLContext(options, true);
-        SSLSocket socket = (SSLSocket) 
ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
-        try
-        {
-            prepareSocket(socket, options);
-            return socket;
-        }
-        catch (IllegalArgumentException e)
-        {
-            socket.close();
-            throw e;
-        }
-    }
-
-    /** Create a socket and connect, using any local address */
-    public static SSLSocket getSocket(EncryptionOptions options, InetAddress 
address, int port) throws IOException
-    {
-        SSLContext ctx = createSSLContext(options, true);
-        SSLSocket socket = (SSLSocket) 
ctx.getSocketFactory().createSocket(address, port);
-        try
-        {
-            prepareSocket(socket, options);
-            return socket;
-        }
-        catch (IllegalArgumentException e)
-        {
-            socket.close();
-            throw e;
-        }
-    }
-
-    /** Sets relevant socket options specified in encryption settings */
-    private static void prepareSocket(SSLSocket socket, EncryptionOptions 
options)
-    {
-        String[] suites = 
filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites);
-        if(options.require_endpoint_verification)
-        {
-            SSLParameters sslParameters = socket.getSSLParameters();
-            sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
-            socket.setSSLParameters(sslParameters);
-        }
-        socket.setEnabledCipherSuites(suites);
-    }
-
     /**
      * Create a JSSE {@link SSLContext}.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index af59733..bab161a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1313,17 +1313,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.getTruncateRpcTimeout();
     }
 
-    public void setStreamingSocketTimeout(int value)
-    {
-        DatabaseDescriptor.setStreamingSocketTimeout(value);
-        logger.info("set streaming socket timeout to {} ms", value);
-    }
-
-    public int getStreamingSocketTimeout()
-    {
-        return DatabaseDescriptor.getStreamingSocketTimeout();
-    }
-
     public void setStreamThroughputMbPerSec(int value)
     {
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 36c43fd..46b7253 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -502,9 +502,6 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public void setTruncateRpcTimeout(long value);
     public long getTruncateRpcTimeout();
 
-    public void setStreamingSocketTimeout(int value);
-    public int getStreamingSocketTimeout();
-
     public void setStreamThroughputMbPerSec(int value);
     public int getStreamThroughputMbPerSec();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
deleted file mode 100644
index 5f734c9..0000000
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.util.concurrent.FastThreadLocalThread;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
-import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
-import org.apache.cassandra.net.IncomingStreamingConnection;
-import org.apache.cassandra.streaming.messages.StreamInitMessage;
-import org.apache.cassandra.streaming.messages.StreamMessage;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-
-/**
- * ConnectionHandler manages incoming/outgoing message exchange for the {@link 
StreamSession}.
- *
- * <p>
- * Internally, ConnectionHandler manages thread to receive incoming {@link 
StreamMessage} and thread to
- * send outgoing message. Messages are encoded/decoded on those thread and 
handed to
- * {@link 
StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}.
- */
-public class ConnectionHandler
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionHandler.class);
-
-    private final StreamSession session;
-
-    private IncomingMessageHandler incoming;
-    private OutgoingMessageHandler outgoing;
-    private final boolean isPreview;
-
-    ConnectionHandler(StreamSession session, int incomingSocketTimeout, 
boolean isPreview)
-    {
-        this.session = session;
-        this.isPreview = isPreview;
-        this.incoming = new IncomingMessageHandler(session, 
incomingSocketTimeout);
-        this.outgoing = new OutgoingMessageHandler(session);
-    }
-
-    /**
-     * Set up incoming message handler and initiate streaming.
-     *
-     * This method is called once on initiator.
-     *
-     * @throws IOException
-     */
-    @SuppressWarnings("resource")
-    public void initiate() throws IOException
-    {
-        logger.debug("[Stream #{}] Sending stream init for incoming stream", 
session.planId());
-        Socket incomingSocket = session.createConnection();
-        incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true);
-
-        logger.debug("[Stream #{}] Sending stream init for outgoing stream", 
session.planId());
-        Socket outgoingSocket = session.createConnection();
-        outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true);
-    }
-
-    /**
-     * Set up outgoing message handler on receiving side.
-     *
-     * @param connection Incoming connection to use for {@link 
OutgoingMessageHandler}.
-     * @param version Streaming message version
-     * @throws IOException
-     */
-    public void initiateOnReceivingSide(IncomingStreamingConnection 
connection, boolean isForOutgoing, int version) throws IOException
-    {
-        if (isForOutgoing)
-            outgoing.start(connection, version);
-        else
-            incoming.start(connection, version);
-    }
-
-    public ListenableFuture<?> close()
-    {
-        logger.debug("[Stream #{}] Closing stream connection handler on {}", 
session.planId(), session.peer);
-
-        ListenableFuture<?> inClosed = closeIncoming();
-        ListenableFuture<?> outClosed = closeOutgoing();
-
-        return Futures.allAsList(inClosed, outClosed);
-    }
-
-    public ListenableFuture<?> closeOutgoing()
-    {
-        return outgoing == null ? Futures.immediateFuture(null) : 
outgoing.close();
-    }
-
-    public ListenableFuture<?> closeIncoming()
-    {
-        return incoming == null ? Futures.immediateFuture(null) : 
incoming.close();
-    }
-
-    /**
-     * Enqueue messages to be sent.
-     *
-     * @param messages messages to send
-     */
-    public void sendMessages(Collection<? extends StreamMessage> messages)
-    {
-        for (StreamMessage message : messages)
-            sendMessage(message);
-    }
-
-    public void sendMessage(StreamMessage message)
-    {
-        if (outgoing.isClosed())
-            throw new RuntimeException("Outgoing stream handler has been 
closed");
-
-        if (message.type == StreamMessage.Type.FILE && isPreview)
-            throw new RuntimeException("Cannot send file messages for preview 
streaming sessions");
-
-        outgoing.enqueue(message);
-    }
-
-    /**
-     * @return true if outgoing connection is opened and ready to send messages
-     */
-    public boolean isOutgoingConnected()
-    {
-        return outgoing != null && !outgoing.isClosed();
-    }
-
-    abstract static class MessageHandler implements Runnable
-    {
-        protected final StreamSession session;
-
-        protected int protocolVersion;
-        private final boolean isOutgoingHandler;
-        protected Socket socket;
-
-        private final AtomicReference<SettableFuture<?>> closeFuture = new 
AtomicReference<>();
-        private IncomingStreamingConnection incomingConnection;
-
-        protected MessageHandler(StreamSession session, boolean 
isOutgoingHandler)
-        {
-            this.session = session;
-            this.isOutgoingHandler = isOutgoingHandler;
-        }
-
-        protected abstract String name();
-
-        @SuppressWarnings("resource")
-        protected static DataOutputStreamPlus getWriteChannel(Socket socket) 
throws IOException
-        {
-            WritableByteChannel out = socket.getChannel();
-            // socket channel is null when encrypted(SSL)
-            if (out == null)
-                return new WrappedDataOutputStreamPlus(new 
BufferedOutputStream(socket.getOutputStream()));
-            return new BufferedDataOutputStreamPlus(out);
-        }
-
-        protected static ReadableByteChannel getReadChannel(Socket socket) 
throws IOException
-        {
-            //we do this instead of socket.getChannel() so socketSoTimeout is 
respected
-            return Channels.newChannel(socket.getInputStream());
-        }
-
-        @SuppressWarnings("resource")
-        private void sendInitMessage() throws IOException
-        {
-            StreamInitMessage message = new 
StreamInitMessage(FBUtilities.getBroadcastAddress(),
-                                                              
session.sessionIndex(),
-                                                              session.planId(),
-                                                              
session.streamOperation(),
-                                                              
!isOutgoingHandler,
-                                                              
session.keepSSTableLevel(),
-                                                              
session.getPendingRepair(),
-                                                              
session.getPreviewKind());
-            ByteBuffer messageBuf = message.createMessage(false, 
protocolVersion);
-            DataOutputStreamPlus out = getWriteChannel(socket);
-            out.write(messageBuf);
-            out.flush();
-        }
-
-        public void start(IncomingStreamingConnection connection, int 
protocolVersion) throws IOException
-        {
-            this.incomingConnection = connection;
-            start(connection.socket, protocolVersion, false);
-        }
-
-        public void start(Socket socket, int protocolVersion, boolean 
initiator) throws IOException
-        {
-            this.socket = socket;
-            this.protocolVersion = protocolVersion;
-            if (initiator)
-                sendInitMessage();
-
-            new FastThreadLocalThread(this, name() + "-" + 
socket.getRemoteSocketAddress()).start();
-        }
-
-        public ListenableFuture<?> close()
-        {
-            // Assume it wasn't closed. Not a huge deal if we create a future 
on a race
-            SettableFuture<?> future = SettableFuture.create();
-            return closeFuture.compareAndSet(null, future)
-                 ? future
-                 : closeFuture.get();
-        }
-
-        public boolean isClosed()
-        {
-            return closeFuture.get() != null;
-        }
-
-        protected void signalCloseDone()
-        {
-            if (!isClosed())
-                close();
-
-            closeFuture.get().set(null);
-
-            // We can now close the socket
-            if (incomingConnection != null)
-            {
-                //this will close the underlying socket and remove it
-                //from active MessagingService connections (CASSANDRA-11854)
-                incomingConnection.close();
-            }
-            else
-            {
-                //this is an outgoing connection not registered in the 
MessagingService
-                //so we can close the socket directly
-                try
-                {
-                    socket.close();
-                }
-                catch (IOException e)
-                {
-                    // Erroring out while closing shouldn't happen but is not 
really a big deal, so just log
-                    // it at DEBUG and ignore otherwise.
-                    logger.debug("Unexpected error while closing streaming 
connection", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Incoming streaming message handler
-     */
-    static class IncomingMessageHandler extends MessageHandler
-    {
-        private final int socketTimeout;
-
-        IncomingMessageHandler(StreamSession session, int socketTimeout)
-        {
-            super(session, false);
-            this.socketTimeout = socketTimeout;
-        }
-
-        @Override
-        public void start(Socket socket, int version, boolean initiator) 
throws IOException
-        {
-            try
-            {
-                socket.setSoTimeout(socketTimeout);
-            }
-            catch (SocketException e)
-            {
-                logger.warn("Could not set incoming socket timeout to {}", 
socketTimeout, e);
-            }
-            super.start(socket, version, initiator);
-        }
-
-        protected String name()
-        {
-            return "STREAM-IN";
-        }
-
-        @SuppressWarnings("resource")
-        public void run()
-        {
-            try
-            {
-                ReadableByteChannel in = getReadChannel(socket);
-                while (!isClosed())
-                {
-                    // receive message
-                    StreamMessage message = StreamMessage.deserialize(in, 
protocolVersion, session);
-                    logger.debug("[Stream #{}] Received {}", session.planId(), 
message);
-                    // Might be null if there is an error during streaming 
(see FileMessage.deserialize). It's ok
-                    // to ignore here since we'll have asked for a retry.
-                    if (message != null)
-                    {
-                        session.messageReceived(message);
-                    }
-                }
-            }
-            catch (Throwable t)
-            {
-                JVMStabilityInspector.inspectThrowable(t);
-                session.onError(t);
-            }
-            finally
-            {
-                signalCloseDone();
-            }
-        }
-    }
-
-    /**
-     * Outgoing file transfer thread
-     */
-    static class OutgoingMessageHandler extends MessageHandler
-    {
-        /*
-         * All out going messages are queued up into messageQueue.
-         * The size will grow when received streaming request.
-         *
-         * Queue is also PriorityQueue so that prior messages can go out fast.
-         */
-        private final PriorityBlockingQueue<StreamMessage> messageQueue = new 
PriorityBlockingQueue<>(64, new Comparator<StreamMessage>()
-        {
-            public int compare(StreamMessage o1, StreamMessage o2)
-            {
-                return o2.getPriority() - o1.getPriority();
-            }
-        });
-
-        OutgoingMessageHandler(StreamSession session)
-        {
-            super(session, true);
-        }
-
-        protected String name()
-        {
-            return "STREAM-OUT";
-        }
-
-        public void enqueue(StreamMessage message)
-        {
-            messageQueue.put(message);
-        }
-
-        @SuppressWarnings("resource")
-        public void run()
-        {
-            try
-            {
-                DataOutputStreamPlus out = getWriteChannel(socket);
-
-                StreamMessage next;
-                while (!isClosed())
-                {
-                    if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != 
null)
-                    {
-                        logger.debug("[Stream #{}] Sending {}", 
session.planId(), next);
-                        sendMessage(out, next);
-                        if (next.type == StreamMessage.Type.SESSION_FAILED)
-                            close();
-                    }
-                }
-
-                // Sends the last messages on the queue
-                while ((next = messageQueue.poll()) != null)
-                    sendMessage(out, next);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-            catch (Throwable e)
-            {
-                session.onError(e);
-            }
-            finally
-            {
-                signalCloseDone();
-            }
-        }
-
-        private void sendMessage(DataOutputStreamPlus out, StreamMessage 
message)
-        {
-            try
-            {
-                StreamMessage.serialize(message, out, protocolVersion, 
session);
-                out.flush();
-                message.sent();
-            }
-            catch (SocketException e)
-            {
-                session.onError(e);
-                close();
-            }
-            catch (IOException e)
-            {
-                session.onError(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java 
b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
index d88d63c..d9ed8be 100644
--- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java
@@ -15,83 +15,93 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.streaming;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Config;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.WriteBufferWaterMark;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.apache.cassandra.net.async.NettyFactory;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.net.async.OutboundConnectionParams;
 
 public class DefaultConnectionFactory implements StreamConnectionFactory
 {
     private static final Logger logger = 
LoggerFactory.getLogger(DefaultConnectionFactory.class);
 
+    private static final int DEFAULT_CHANNEL_BUFFER_SIZE = 1 << 22;
+
+    private static final long MAX_WAIT_TIME_NANOS = 
TimeUnit.SECONDS.toNanos(30);
     private static final int MAX_CONNECT_ATTEMPTS = 3;
 
-    /**
-     * Connect to peer and start exchanging message.
-     * When connect attempt fails, this retries for maximum of 
MAX_CONNECT_ATTEMPTS times.
-     *
-     * @param peer the peer to connect to.
-     * @return the created socket.
-     *
-     * @throws IOException when connection failed.
-     */
-    public Socket createConnection(InetAddress peer) throws IOException
+    @Override
+    public Channel createConnection(OutboundConnectionIdentifier connectionId, 
int protocolVersion) throws IOException
     {
-        int attempts = 0;
+        ServerEncryptionOptions encryptionOptions = 
DatabaseDescriptor.getServerEncryptionOptions();
+
+        if (encryptionOptions.internode_encryption == 
ServerEncryptionOptions.InternodeEncryption.none)
+            encryptionOptions = null;
+
+        return createConnection(connectionId, protocolVersion, 
encryptionOptions);
+    }
+
+    protected Channel createConnection(OutboundConnectionIdentifier 
connectionId, int protocolVersion, @Nullable ServerEncryptionOptions 
encryptionOptions) throws IOException
+    {
+        // this is the amount of data to allow in memory before netty sets the 
channel writablility flag to false
+        int channelBufferSize = DEFAULT_CHANNEL_BUFFER_SIZE;
+        WriteBufferWaterMark waterMark = new 
WriteBufferWaterMark(channelBufferSize >> 2, channelBufferSize);
+
+        int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0
+                             ? DatabaseDescriptor.getInternodeSendBufferSize()
+                             : 
OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE;
+
+        OutboundConnectionParams params = OutboundConnectionParams.builder()
+                                                                  
.connectionId(connectionId)
+                                                                  
.encryptionOptions(encryptionOptions)
+                                                                  
.mode(NettyFactory.Mode.STREAMING)
+                                                                  
.protocolVersion(protocolVersion)
+                                                                  
.sendBufferSize(sendBufferSize)
+                                                                  
.waterMark(waterMark)
+                                                                  .build();
+
+        Bootstrap bootstrap = 
NettyFactory.instance.createOutboundBootstrap(params);
+
+        int connectionAttemptCount = 0;
+        long now = System.nanoTime();
+        final long end = now + MAX_WAIT_TIME_NANOS;
+        final Channel channel;
         while (true)
         {
-            try
-            {
-                Socket socket = newSocket(peer);
-                
socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
-                socket.setKeepAlive(true);
-                return socket;
-            }
-            catch (IOException e)
+            ChannelFuture channelFuture = bootstrap.connect();
+            channelFuture.awaitUninterruptibly(end - now, 
TimeUnit.MILLISECONDS);
+            if (channelFuture.isSuccess())
             {
-                if (++attempts >= MAX_CONNECT_ATTEMPTS)
-                    throw e;
-
-                long waitms = DatabaseDescriptor.getRpcTimeout() * 
(long)Math.pow(2, attempts);
-                logger.warn("Failed attempt {} to connect to {}. Retrying in 
{} ms. ({})", attempts, peer, waitms, e.getMessage());
-                try
-                {
-                    Thread.sleep(waitms);
-                }
-                catch (InterruptedException wtf)
-                {
-                    throw new IOException("interrupted", wtf);
-                }
+                channel = channelFuture.channel();
+                break;
             }
-        }
-    }
 
-    // TODO this is deliberately copied from (the now former) 
OutboundTcpConnectionPool, for CASSANDRA-8457.
-    // to be replaced in CASSANDRA-12229 (make streaming use 8457)
-    public static Socket newSocket(InetAddress endpoint) throws IOException
-    {
-        // zero means 'bind on any available port.'
-        if (MessagingService.isEncryptedConnection(endpoint))
-        {
-            return 
SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, 
DatabaseDescriptor.getSSLStoragePort());
-        }
-        else
-        {
-            SocketChannel channel = SocketChannel.open();
-            channel.connect(new InetSocketAddress(endpoint, 
DatabaseDescriptor.getStoragePort()));
-            return channel.socket();
+            connectionAttemptCount++;
+            now = System.nanoTime();
+            if (connectionAttemptCount == MAX_CONNECT_ATTEMPTS || end - now <= 
0)
+                throw new IOException("failed to connect to " + connectionId + 
" for streaming data", channelFuture.cause());
+
+            long waitms = DatabaseDescriptor.getRpcTimeout() * 
(long)Math.pow(2, connectionAttemptCount);
+            logger.warn("Failed attempt {} to connect to {}. Retrying in {} 
ms.", connectionAttemptCount, connectionId, waitms);
+            Uninterruptibles.sleepUninterruptibly(waitms, 
TimeUnit.MILLISECONDS);
         }
+
+        return channel;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java 
b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
index dd99611..4cfe41e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
+++ b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java
@@ -15,16 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.streaming;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
 
-/**
- * Interface that creates connection used by streaming.
- */
+import io.netty.channel.Channel;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+
 public interface StreamConnectionFactory
 {
-    Socket createConnection(InetAddress peer) throws IOException;
+    Channel createConnection(OutboundConnectionIdentifier connectionId, int 
protocolVersion) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 9059f45..bb8c702 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -37,8 +37,10 @@ public class StreamCoordinator
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamCoordinator.class);
 
-    // Executor strictly for establishing the initial connections. Once we're 
connected to the other end the rest of the
-    // streaming is handled directly by the ConnectionHandler's incoming and 
outgoing threads.
+    /**
+     * Executor strictly for establishing the initial connections. Once we're 
connected to the other end the rest of the
+     * streaming is handled directly by the {@link StreamingMessageSender}'s 
incoming and outgoing threads.
+     */
     private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
                                                                                
                                             
FBUtilities.getAvailableProcessors());
     private final boolean connectSequentially;
@@ -55,8 +57,8 @@ public class StreamCoordinator
                              boolean connectSequentially, UUID pendingRepair, 
PreviewKind previewKind)
     {
         this.connectionsPerHost = connectionsPerHost;
-        this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
+        this.factory = factory;
         this.connectSequentially = connectSequentially;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
@@ -163,6 +165,11 @@ public class StreamCoordinator
         return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, 
connecting);
     }
 
+    public StreamSession getSessionById(InetAddress peer, int id)
+    {
+        return getHostData(peer).getSessionById(id);
+    }
+
     public synchronized void updateProgress(ProgressInfo info)
     {
         getHostData(info.peer).updateProgress(info);
@@ -274,8 +281,8 @@ public class StreamCoordinator
 
     private class HostStreamingData
     {
-        private Map<Integer, StreamSession> streamSessions = new HashMap<>();
-        private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+        private final Map<Integer, StreamSession> streamSessions = new 
HashMap<>();
+        private final Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
 
         private int lastReturned = -1;
 
@@ -333,6 +340,11 @@ public class StreamCoordinator
             return session;
         }
 
+        public StreamSession getSessionById(int id)
+        {
+            return streamSessions.get(id);
+        }
+
         public void updateProgress(ProgressInfo info)
         {
             sessionInfos.get(info.sessionIndex).updateProgress(info);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java 
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 52652c0..a44f02e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -21,7 +21,6 @@ import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.NotificationFilter;
@@ -136,7 +135,7 @@ public class StreamManager implements StreamManagerMBean
         initiatedStreams.put(result.planId, result);
     }
 
-    public void registerReceiving(final StreamResultFuture result)
+    public StreamResultFuture registerReceiving(final StreamResultFuture 
result)
     {
         result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or 
not)
@@ -148,7 +147,8 @@ public class StreamManager implements StreamManagerMBean
             }
         }, MoreExecutors.directExecutor());
 
-        receivingStreams.put(result.planId, result);
+        StreamResultFuture previous = 
receivingStreams.putIfAbsent(result.planId, result);
+        return previous ==  null ? result : previous;
     }
 
     public StreamResultFuture getReceivingStream(UUID planId)
@@ -175,4 +175,22 @@ public class StreamManager implements StreamManagerMBean
     {
         return notifier.getNotificationInfo();
     }
+
+    public StreamSession findSession(InetAddress peer, UUID planId, int 
sessionIndex)
+    {
+        StreamSession session = findSession(initiatedStreams, peer, planId, 
sessionIndex);
+        if (session !=  null)
+            return session;
+
+        return findSession(receivingStreams, peer, planId, sessionIndex);
+    }
+
+    private StreamSession findSession(Map<UUID, StreamResultFuture> streams, 
InetAddress peer, UUID planId, int sessionIndex)
+    {
+        StreamResultFuture streamResultFuture = streams.get(planId);
+        if (streamResultFuture == null)
+            return null;
+
+        return streamResultFuture.getSession(peer, sessionIndex);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 05a8d30..213f74b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -33,7 +33,7 @@ import static 
org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR
  */
 public class StreamPlan
 {
-    public static final String[] EMPTY_COLUMN_FAMILIES = new String[0];
+    private static final String[] EMPTY_COLUMN_FAMILIES = new String[0];
     private final UUID planId = UUIDGen.getTimeUUID();
     private final StreamOperation streamOperation;
     private final List<StreamEventHandler> handlers = new ArrayList<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to