Repository: cassandra
Updated Branches:
  refs/heads/trunk 3d4a7e7b6 -> fc92db2b9


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index ceaa4d1..68c6034 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -19,103 +19,64 @@ package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.nio.ByteBuffer;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputBufferFixed;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * StreamInitMessage is first sent from the node where {@link 
org.apache.cassandra.streaming.StreamSession} is started,
  * to initiate corresponding {@link 
org.apache.cassandra.streaming.StreamSession} on the other side.
  */
-public class StreamInitMessage
+public class StreamInitMessage extends StreamMessage
 {
-    public static IVersionedSerializer<StreamInitMessage> serializer = new 
StreamInitMessageSerializer();
+    public static Serializer<StreamInitMessage> serializer = new 
StreamInitMessageSerializer();
 
     public final InetAddress from;
     public final int sessionIndex;
     public final UUID planId;
     public final StreamOperation streamOperation;
 
-    // true if this init message is to connect for outgoing message on 
receiving side
-    public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
     public final UUID pendingRepair;
     public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
StreamOperation streamOperation, boolean isForOutgoing, boolean 
keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
StreamOperation streamOperation, boolean keepSSTableLevel, UUID pendingRepair, 
PreviewKind previewKind)
     {
+        super(Type.STREAM_INIT);
         this.from = from;
         this.sessionIndex = sessionIndex;
         this.planId = planId;
         this.streamOperation = streamOperation;
-        this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
     }
 
-    /**
-     * Create serialized message.
-     *
-     * @param compress true if message is compressed
-     * @param version Streaming protocol version
-     * @return serialized message in ByteBuffer format
-     */
-    public ByteBuffer createMessage(boolean compress, int version)
+    @Override
+    public String toString()
     {
-        int header = 0;
-        // set compression bit.
-        if (compress)
-            header |= 4;
-        // set streaming bit
-        header |= 8;
-        // Setting up the version bit
-        header |= (version << 8);
-
-        byte[] bytes;
-        try
-        {
-            int size = (int)StreamInitMessage.serializer.serializedSize(this, 
version);
-            try (DataOutputBuffer buffer = new DataOutputBufferFixed(size))
-            {
-                StreamInitMessage.serializer.serialize(this, buffer, version);
-                bytes = buffer.getData();
-            }
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        assert bytes.length > 0;
-
-        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + bytes.length);
-        buffer.putInt(MessagingService.PROTOCOL_MAGIC);
-        buffer.putInt(header);
-        buffer.put(bytes);
-        buffer.flip();
-        return buffer;
+        StringBuilder sb = new StringBuilder(128);
+        sb.append("StreamInitMessage: from = ").append(from);
+        sb.append(", planId = ").append(planId).append(", session index = 
").append(sessionIndex);
+        return sb.toString();
     }
 
-    private static class StreamInitMessageSerializer implements 
IVersionedSerializer<StreamInitMessage>
+    private static class StreamInitMessageSerializer implements 
Serializer<StreamInitMessage>
     {
-        public void serialize(StreamInitMessage message, DataOutputPlus out, 
int version) throws IOException
+        public void serialize(StreamInitMessage message, DataOutputStreamPlus 
out, int version, StreamSession session) throws IOException
         {
             CompactEndpointSerializationHelper.serialize(message.from, out);
             out.writeInt(message.sessionIndex);
             UUIDSerializer.serializer.serialize(message.planId, out, 
MessagingService.current_version);
             out.writeUTF(message.streamOperation.getDescription());
-            out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
 
             out.writeBoolean(message.pendingRepair != null);
@@ -126,18 +87,17 @@ public class StreamInitMessage
             out.writeInt(message.previewKind.getSerializationVal());
         }
 
-        public StreamInitMessage deserialize(DataInputPlus in, int version) 
throws IOException
+        public StreamInitMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
         {
             InetAddress from = 
CompactEndpointSerializationHelper.deserialize(in);
             int sessionIndex = in.readInt();
             UUID planId = UUIDSerializer.serializer.deserialize(in, 
MessagingService.current_version);
             String description = in.readUTF();
-            boolean sentByInitiator = in.readBoolean();
             boolean keepSSTableLevel = in.readBoolean();
 
             UUID pendingRepair = in.readBoolean() ? 
UUIDSerializer.serializer.deserialize(in, version) : null;
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, 
pendingRepair, previewKind);
+            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description), keepSSTableLevel, pendingRepair, 
previewKind);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -146,7 +106,6 @@ public class StreamInitMessage
             size += TypeSizes.sizeof(message.sessionIndex);
             size += UUIDSerializer.serializer.serializedSize(message.planId, 
MessagingService.current_version);
             size += TypeSizes.sizeof(message.streamOperation.getDescription());
-            size += TypeSizes.sizeof(message.isForOutgoing);
             size += TypeSizes.sizeof(message.keepSSTableLevel);
             size += TypeSizes.sizeof(message.pendingRepair != null);
             if (message.pendingRepair != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
index 48def64..feeab05 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamMessage.java
@@ -18,10 +18,8 @@
 package org.apache.cassandra.streaming.messages;
 
 import java.io.IOException;
-import java.net.SocketException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
 
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.streaming.StreamSession;
 
@@ -36,67 +34,47 @@ public abstract class StreamMessage
     public static final int VERSION_40 = 5;
     public static final int CURRENT_VERSION = VERSION_40;
 
-    private transient volatile boolean sent = false;
-
     public static void serialize(StreamMessage message, DataOutputStreamPlus 
out, int version, StreamSession session) throws IOException
     {
-        ByteBuffer buff = ByteBuffer.allocate(1);
         // message type
-        buff.put(message.type.type);
-        buff.flip();
-        out.write(buff);
+        out.writeByte(message.type.type);
         message.type.outSerializer.serialize(message, out, version, session);
     }
 
-    public static StreamMessage deserialize(ReadableByteChannel in, int 
version, StreamSession session) throws IOException
-    {
-        ByteBuffer buff = ByteBuffer.allocate(1);
-        int readBytes = in.read(buff);
-        if (readBytes > 0)
-        {
-            buff.flip();
-            Type type = Type.get(buff.get());
-            return type.inSerializer.deserialize(in, version, session);
-        }
-        else if (readBytes == 0)
-        {
-            // input socket buffer was not filled yet
-            return null;
-        }
-        else
-        {
-            // possibly socket gets closed
-            throw new SocketException("End-of-stream reached");
-        }
-    }
-
-    public void sent()
+    public static long serializedSize(StreamMessage message, int version) 
throws IOException
     {
-        sent = true;
+        return 1 + message.type.outSerializer.serializedSize(message, version);
     }
 
-    public boolean wasSent()
+    public static StreamMessage deserialize(DataInputPlus in, int version, 
StreamSession session) throws IOException
     {
-        return sent;
+        byte b = in.readByte();
+        if (b == 0)
+            b = -1;
+        Type type = Type.get(b);
+        return type.inSerializer.deserialize(in, version, session);
     }
 
     /** StreamMessage serializer */
     public static interface Serializer<V extends StreamMessage>
     {
-        V deserialize(ReadableByteChannel in, int version, StreamSession 
session) throws IOException;
+        V deserialize(DataInputPlus in, int version, StreamSession session) 
throws IOException;
         void serialize(V message, DataOutputStreamPlus out, int version, 
StreamSession session) throws IOException;
+        long serializedSize(V message, int version) throws IOException;
     }
 
     /** StreamMessage types */
-    public static enum Type
+    public enum Type
     {
-        PREPARE(1, 5, PrepareMessage.serializer),
+        PREPARE_SYN(1, 5, PrepareSynMessage.serializer),
         FILE(2, 0, IncomingFileMessage.serializer, 
OutgoingFileMessage.serializer),
         RECEIVED(3, 4, ReceivedMessage.serializer),
-        RETRY(4, 4, RetryMessage.serializer),
         COMPLETE(5, 1, CompleteMessage.serializer),
         SESSION_FAILED(6, 5, SessionFailedMessage.serializer),
-        KEEP_ALIVE(7, 5, KeepAliveMessage.serializer);
+        KEEP_ALIVE(7, 5, KeepAliveMessage.serializer),
+        PREPARE_SYNACK(8, 5, PrepareSynAckMessage.serializer),
+        PREPARE_ACK(9, 5, PrepareAckMessage.serializer),
+        STREAM_INIT(10, 5, StreamInitMessage.serializer);
 
         public static Type get(byte type)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java 
b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
index aa23f45..d119081 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoadConnectionFactory.java
@@ -15,20 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.cassandra.tools;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
 
+import io.netty.channel.Channel;
 import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
 import org.apache.cassandra.streaming.StreamConnectionFactory;
-import org.apache.cassandra.utils.FBUtilities;
 
-public class BulkLoadConnectionFactory implements StreamConnectionFactory
+public class BulkLoadConnectionFactory extends DefaultConnectionFactory 
implements StreamConnectionFactory
 {
     private final boolean outboundBindAny;
     private final int storagePort;
@@ -43,24 +42,15 @@ public class BulkLoadConnectionFactory implements 
StreamConnectionFactory
         this.outboundBindAny = outboundBindAny;
     }
 
-    public Socket createConnection(InetAddress peer) throws IOException
+    public Channel createConnection(OutboundConnectionIdentifier connectionId, 
int protocolVersion) throws IOException
     {
         // Connect to secure port for all peers if ServerEncryptionOptions is 
configured other than 'none'
         // When 'all', 'dc' and 'rack', server nodes always have SSL port 
open, and since thin client like sstableloader
         // does not know which node is in which dc/rack, connecting to SSL 
port is always the option.
-        if (encryptionOptions != null && 
encryptionOptions.internode_encryption != 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none)
-        {
-            if (outboundBindAny)
-                return SSLFactory.getSocket(encryptionOptions, peer, 
secureStoragePort);
-            else
-                return SSLFactory.getSocket(encryptionOptions, peer, 
secureStoragePort, FBUtilities.getLocalAddress(), 0);
-        }
-        else
-        {
-            Socket socket = SocketChannel.open(new InetSocketAddress(peer, 
storagePort)).socket();
-            if (outboundBindAny && !socket.isBound())
-                socket.bind(new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
-            return socket;
-        }
+        int port = encryptionOptions != null && 
encryptionOptions.internode_encryption != 
EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none ?
+                   secureStoragePort : storagePort;
+
+        connectionId = connectionId.withNewConnectionAddress(new 
InetSocketAddress(connectionId.remote(), port));
+        return createConnection(connectionId, protocolVersion, 
encryptionOptions);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 109a51e..267a6d5 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1076,8 +1076,6 @@ public class NodeProbe implements AutoCloseable
                 return ssProxy.getCasContentionTimeout();
             case "truncate":
                 return ssProxy.getTruncateRpcTimeout();
-            case "streamingsocket":
-                return ssProxy.getStreamingSocketTimeout();
             default:
                 throw new RuntimeException("Timeout type requires one of (" + 
GetTimeout.TIMEOUT_TYPES + ")");
         }
@@ -1156,11 +1154,6 @@ public class NodeProbe implements AutoCloseable
             case "truncate":
                 ssProxy.setTruncateRpcTimeout(value);
                 break;
-            case "streamingsocket":
-                if (value > Integer.MAX_VALUE)
-                    throw new RuntimeException("streamingsocket timeout must 
be less than " + Integer.MAX_VALUE);
-                ssProxy.setStreamingSocketTimeout((int) value);
-                break;
             default:
                 throw new RuntimeException("Timeout type requires one of (" + 
GetTimeout.TIMEOUT_TYPES + ")");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java 
b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
index 6c9b541..deac8a3 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetTimeout.java
@@ -31,7 +31,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 @Command(name = "gettimeout", description = "Print the timeout of the given 
type in ms")
 public class GetTimeout extends NodeToolCmd
 {
-    public static final String TIMEOUT_TYPES = "read, range, write, 
counterwrite, cascontention, truncate, streamingsocket, misc (general 
rpc_timeout_in_ms)";
+    public static final String TIMEOUT_TYPES = "read, range, write, 
counterwrite, cascontention, truncate, misc (general rpc_timeout_in_ms)";
 
     @Arguments(usage = "<timeout_type>", description = "The timeout type, one 
of (" + TIMEOUT_TYPES + ")")
     private List<String> args = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java 
b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 8fac816..37f08bf 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -40,6 +40,8 @@ public class UUIDGen
     private static final long START_EPOCH = -12219292800000L;
     private static final long clockSeqAndNode = makeClockSeqAndNode();
 
+    public static final int UUID_LEN = 16;
+
     /*
      * The min and max possible lsb for a UUID.
      * Note that his is not 0 and all 1's because Cassandra TimeUUIDType
@@ -106,10 +108,10 @@ public class UUIDGen
     }
 
     /**
-     * Similar to {@link getTimeUUIDFromMicros}, but randomize (using 
SecureRandom) the clock and sequence.
+     * Similar to {@link #getTimeUUIDFromMicros}, but randomize (using 
SecureRandom) the clock and sequence.
      * <p>
      * If you can guarantee that the {@code whenInMicros} argument is unique 
(for this JVM instance) for
-     * every call, then you should prefer {@link getTimeUUIDFromMicros} which 
is faster. If you can't
+     * every call, then you should prefer {@link #getTimeUUIDFromMicros} which 
is faster. If you can't
      * guarantee this however, this method will ensure the returned UUID are 
still unique (accross calls)
      * through randomization.
      *
@@ -143,7 +145,7 @@ public class UUIDGen
 
     public static ByteBuffer toByteBuffer(UUID uuid)
     {
-        ByteBuffer buffer = ByteBuffer.allocate(16);
+        ByteBuffer buffer = ByteBuffer.allocate(UUID_LEN);
         buffer.putLong(uuid.getMostSignificantBits());
         buffer.putLong(uuid.getLeastSignificantBits());
         buffer.flip();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java 
b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index a30d6c9..799ac77 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -24,10 +24,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.io.Files;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.Schema;
@@ -62,26 +64,42 @@ public class LongStreamingTest
     }
 
     @Test
-    public void testCompressedStream() throws InvalidRequestException, 
IOException, ExecutionException, InterruptedException
+    public void testSstableCompressionStreaming() throws InterruptedException, 
ExecutionException, IOException
     {
-        String KS = "cql_keyspace";
+        testStream(true);
+    }
+
+    @Test
+    public void testStreamCompressionStreaming() throws InterruptedException, 
ExecutionException, IOException
+    {
+        testStream(false);
+    }
+
+    private void testStream(boolean useSstableCompression) throws 
InvalidRequestException, IOException, ExecutionException, InterruptedException
+    {
+        String KS = useSstableCompression ? "sstable_compression_ks" : 
"stream_compression_ks";
         String TABLE = "table1";
 
         File tempdir = Files.createTempDir();
         File dataDir = new File(tempdir.getAbsolutePath() + File.separator + 
KS + File.separator + TABLE);
         assert dataDir.mkdirs();
 
-        String schema = "CREATE TABLE cql_keyspace.table1 ("
+        String schema = "CREATE TABLE " + KS + '.'  + TABLE + "  ("
                         + "  k int PRIMARY KEY,"
                         + "  v1 text,"
                         + "  v2 int"
-                        + ");";// with compression = {};";
-        String insert = "INSERT INTO cql_keyspace.table1 (k, v1, v2) VALUES 
(?, ?, ?)";
+                        + ") with compression = " + (useSstableCompression ? 
"{'class': 'LZ4Compressor'};" : "{};");
+        String insert = "INSERT INTO " + KS + '.'  + TABLE + " (k, v1, v2) 
VALUES (?, ?, ?)";
         CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                   .sorted()
                                                   .inDirectory(dataDir)
                                                   .forTable(schema)
                                                   .using(insert).build();
+
+        CompressionParams compressionParams = 
Keyspace.open(KS).getColumnFamilyStore(TABLE).metadata().params.compression;
+        Assert.assertEquals(useSstableCompression, 
compressionParams.isEnabled());
+
+
         long start = System.nanoTime();
 
         for (int i = 0; i < 10_000_000; i++)
@@ -103,7 +121,7 @@ public class LongStreamingTest
             private String ks;
             public void init(String keyspace)
             {
-                for (Range<Token> range : 
StorageService.instance.getLocalRanges("cql_keyspace"))
+                for (Range<Token> range : 
StorageService.instance.getLocalRanges(KS))
                     addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
 
                 this.ks = keyspace;
@@ -130,7 +148,7 @@ public class LongStreamingTest
             private String ks;
             public void init(String keyspace)
             {
-                for (Range<Token> range : 
StorageService.instance.getLocalRanges("cql_keyspace"))
+                for (Range<Token> range : 
StorageService.instance.getLocalRanges(KS))
                     addRangeForEndpoint(range, 
FBUtilities.getBroadcastAddress());
 
                 this.ks = keyspace;
@@ -160,7 +178,7 @@ public class LongStreamingTest
                                          millis / 1000d,
                                          (dataSize * 2 / (1 << 20) / (millis / 
1000d)) * 8));
 
-        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM 
cql_keyspace.table1 limit 100;");
+        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM " 
+ KS + '.'  + TABLE + " limit 100;");
         assertEquals(100, rs.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java 
b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index cd7e381..385ebb7 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -54,7 +54,7 @@ public class PreparedStatementsTest extends SchemaLoader
 
         // Currently the native server start method return before the server 
is fully binded to the socket, so we need
         // to wait slightly before trying to connect to it. We should fix this 
but in the meantime using a sleep.
-        Thread.sleep(500);
+        Thread.sleep(1500);
 
         cluster = Cluster.builder().addContactPoint("127.0.0.1")
                                    
.withPort(DatabaseDescriptor.getNativeTransportPort())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java 
b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
index 175ab53..1803c51 100644
--- 
a/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
+++ 
b/test/unit/org/apache/cassandra/io/util/RewindableDataInputStreamPlusTest.java
@@ -378,7 +378,7 @@ public class RewindableDataInputStreamPlusTest
             //finish reading again previous sequence
 
             reader.mark();
-            //read 3 bytes - OK
+            //read 3 bytes - START
             assertEquals('a', reader.readChar());
             //read 1 more bytes - CAPACITY will exhaust when trying to reset :(
             assertEquals(1, reader.readShort());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java 
b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
index fa6e2b5..100e1e0 100644
--- a/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
+++ b/test/unit/org/apache/cassandra/net/async/HandshakeHandlersTest.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 
-import static 
org.apache.cassandra.net.async.InboundHandshakeHandler.State.MESSAGING_HANDSHAKE_COMPLETE;
+import static 
org.apache.cassandra.net.async.InboundHandshakeHandler.State.HANDSHAKE_COMPLETE;
 import static 
org.apache.cassandra.net.async.OutboundMessagingConnection.State.READY;
 
 public class HandshakeHandlersTest
@@ -100,7 +100,7 @@ public class HandshakeHandlersTest
             inboundChannel.writeInbound(o);
 
         Assert.assertEquals(READY, imc.getState());
-        Assert.assertEquals(MESSAGING_HANDSHAKE_COMPLETE, 
inboundHandshakeHandler.getState());
+        Assert.assertEquals(HANDSHAKE_COMPLETE, 
inboundHandshakeHandler.getState());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java 
b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
index 44dc469..4d9829f 100644
--- a/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
+++ b/test/unit/org/apache/cassandra/net/async/InboundHandshakeHandlerTest.java
@@ -181,7 +181,7 @@ public class InboundHandshakeHandlerTest
         buf = new ThirdHandshakeMessage(MESSAGING_VERSION, 
addr.getAddress()).encode(PooledByteBufAllocator.DEFAULT);
         state = 
handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
 
-        Assert.assertEquals(State.MESSAGING_HANDSHAKE_COMPLETE, state);
+        Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
         Assert.assertTrue(channel.isOpen());
         Assert.assertTrue(channel.isActive());
         Assert.assertFalse(channel.outboundMessages().isEmpty());
@@ -217,7 +217,7 @@ public class InboundHandshakeHandlerTest
         buf.writeInt(MESSAGING_VERSION);
         CompactEndpointSerializationHelper.serialize(addr.getAddress(), new 
ByteBufOutputStream(buf));
         State state = 
handler.handleMessagingStartResponse(channel.pipeline().firstContext(), buf);
-        Assert.assertEquals(State.MESSAGING_HANDSHAKE_COMPLETE, state);
+        Assert.assertEquals(State.HANDSHAKE_COMPLETE, state);
         Assert.assertTrue(channel.isOpen());
         Assert.assertTrue(channel.isActive());
     }
@@ -268,9 +268,9 @@ public class InboundHandshakeHandlerTest
         handler.setHandshakeTimeout(future);
         Assert.assertFalse(future.isCancelled());
         Assert.assertTrue(channel.isOpen());
-        handler.setState(State.MESSAGING_HANDSHAKE_COMPLETE);
+        handler.setState(State.HANDSHAKE_COMPLETE);
         handler.failHandshake(channel.pipeline().firstContext());
-        Assert.assertSame(State.MESSAGING_HANDSHAKE_COMPLETE, 
handler.getState());
+        Assert.assertSame(State.HANDSHAKE_COMPLETE, handler.getState());
         Assert.assertTrue(channel.isOpen());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java 
b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
index 772e47d..d6dd633 100644
--- 
a/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
+++ 
b/test/unit/org/apache/cassandra/net/async/OutboundMessagingConnectionTest.java
@@ -24,11 +24,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLHandshakeException;
 
@@ -475,45 +471,4 @@ public class OutboundMessagingConnectionTest
         Assert.assertNotSame(omc.getConnectionId(), originalId);
         Assert.assertSame(NOT_READY, omc.getState());
     }
-
-    private static class TestScheduledFuture implements ScheduledFuture<Object>
-    {
-        private boolean cancelled = false;
-
-        public long getDelay(TimeUnit unit)
-        {
-            return 0;
-        }
-
-        public int compareTo(Delayed o)
-        {
-            return 0;
-        }
-
-        public boolean cancel(boolean mayInterruptIfRunning)
-        {
-            cancelled = true;
-            return false;
-        }
-
-        public boolean isCancelled()
-        {
-            return cancelled;
-        }
-
-        public boolean isDone()
-        {
-            return false;
-        }
-
-        public Object get() throws InterruptedException, ExecutionException
-        {
-            return null;
-        }
-
-        public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException
-        {
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
 
b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
new file mode 100644
index 0000000..4968196
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlusTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+
+public class RebufferingByteBufDataInputPlusTest
+{
+    private EmbeddedChannel channel;
+    private RebufferingByteBufDataInputPlus inputPlus;
+    private ByteBuf buf;
+
+    @Before
+    public void setUp()
+    {
+        channel = new EmbeddedChannel();
+        inputPlus = new RebufferingByteBufDataInputPlus(1 << 10, 1 << 11, 
channel.config());
+    }
+
+    @After
+    public void tearDown()
+    {
+        inputPlus.close();
+        channel.close();
+
+        if (buf != null && buf.refCnt() > 0)
+            buf.release(buf.refCnt());
+    }
+
+    @Test (expected = IllegalArgumentException.class)
+    public void ctor_badWaterMarks()
+    {
+        inputPlus = new RebufferingByteBufDataInputPlus(2, 1, null);
+    }
+
+    @Test
+    public void isOpen()
+    {
+        Assert.assertTrue(inputPlus.isOpen());
+        inputPlus.markClose();
+        Assert.assertFalse(inputPlus.isOpen());
+    }
+
+    @Test (expected = IllegalStateException.class)
+    public void append_closed()
+    {
+        inputPlus.markClose();
+        buf = channel.alloc().buffer(4);
+        inputPlus.append(buf);
+    }
+
+    @Test
+    public void append_normal() throws EOFException
+    {
+        int size = 4;
+        buf = channel.alloc().buffer(size);
+        buf.writerIndex(size);
+        inputPlus.append(buf);
+        Assert.assertEquals(buf.readableBytes(), inputPlus.available());
+    }
+
+    @Test
+    public void read() throws IOException
+    {
+        // put two buffers of 8 bytes each into the queue.
+        // then read an int, then a long. the latter tests offset into the 
inputPlus, as well as spanning across queued buffers.
+        // the values of those int/long will both be '42', but spread across 
both queue buffers.
+        ByteBuf buf = channel.alloc().buffer(8);
+        buf.writeInt(42);
+        buf.writerIndex(8);
+        inputPlus.append(buf);
+        buf = channel.alloc().buffer(8);
+        buf.writeInt(42);
+        buf.writerIndex(8);
+        inputPlus.append(buf);
+        Assert.assertEquals(16, inputPlus.available());
+
+        ByteBuffer out = ByteBuffer.allocate(4);
+        int readCount = inputPlus.read(out);
+        Assert.assertEquals(4, readCount);
+        out.flip();
+        Assert.assertEquals(42, out.getInt());
+        Assert.assertEquals(12, inputPlus.available());
+
+        out = ByteBuffer.allocate(8);
+        readCount = inputPlus.read(out);
+        Assert.assertEquals(8, readCount);
+        out.flip();
+        Assert.assertEquals(42, out.getLong());
+        Assert.assertEquals(4, inputPlus.available());
+    }
+
+    @Test (expected = EOFException.class)
+    public void read_closed() throws IOException
+    {
+        inputPlus.markClose();
+        ByteBuffer buf = ByteBuffer.allocate(1);
+        inputPlus.read(buf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java 
b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java
new file mode 100644
index 0000000..f5475ce
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/async/TestScheduledFuture.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class TestScheduledFuture implements ScheduledFuture<Object>
+{
+    private boolean cancelled = false;
+
+    public long getDelay(TimeUnit unit)
+    {
+        return 0;
+    }
+
+    public int compareTo(Delayed o)
+    {
+        return 0;
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning)
+    {
+        cancelled = true;
+        return false;
+    }
+
+    public boolean isCancelled()
+    {
+        return cancelled;
+    }
+
+    public boolean isDone()
+    {
+        return false;
+    }
+
+    public Object get() throws InterruptedException, ExecutionException
+    {
+        return null;
+    }
+
+    public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java 
b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 4efdb21..afc5b25 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -70,6 +70,7 @@ public class RemoveTest
     public static void setupClass() throws ConfigurationException
     {
         oldPartitioner = 
StorageService.instance.setPartitionerUnsafe(partitioner);
+        MessagingService.instance().listen();
     }
 
     @AfterClass
@@ -86,7 +87,6 @@ public class RemoveTest
         // create a ring of 5 nodes
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, hostIds, 6);
 
-        MessagingService.instance().listen();
         removalhost = hosts.get(5);
         hosts.remove(removalhost);
         removalId = hostIds.get(5);
@@ -98,7 +98,6 @@ public class RemoveTest
     {
         MessagingService.instance().clearMessageSinks();
         MessagingService.instance().clearCallbacksUnsafe();
-        MessagingService.instance().shutdown();
     }
 
     @Test(expected = UnsupportedOperationException.class)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 21c8375..5c29698 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -32,6 +32,7 @@ import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
 
+import io.netty.channel.embedded.EmbeddedChannel;
 import junit.framework.Assert;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -74,7 +75,7 @@ public class StreamTransferTaskTest
     public void testScheduleTimeout() throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
null, PreviewKind.NONE);
+        StreamSession session = new StreamSession(peer, peer, (connectionId, 
protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), 
PreviewKind.ALL);
         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
         // create two sstables
@@ -120,7 +121,7 @@ public class StreamTransferTaskTest
     public void testFailSessionDuringTransferShouldNotReleaseReferences() 
throws Exception
     {
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
null, false, null, PreviewKind.NONE);
+        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
         StreamSession session = new StreamSession(peer, peer, null, 0, true, 
null, PreviewKind.NONE);
         session.init(future);
@@ -159,7 +160,7 @@ public class StreamTransferTaskTest
         }
 
         //fail stream session mid-transfer
-        session.onError(new Exception("Fake exception"));
+        session.onError(new Exception("Fake exception")).get(5, 
TimeUnit.SECONDS);
 
         //make sure reference was not released
         for (Ref<SSTableReader> ref : refs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 6a5002e..7a51d0c 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -26,7 +26,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -47,7 +46,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
@@ -62,9 +60,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
-
-// TODO:JEB intentionally breaking this with CASSANDRA-8457 until 
CASSANDRA-12229
-@Ignore
 public class StreamingTransferTest
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamingTransferTest.class);
@@ -244,7 +239,6 @@ public class StreamingTransferTest
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key0"))));
         StreamPlan streamPlan = new 
StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), 
ranges, cfs.getTableName());
         streamPlan.execute().get();
-        verifyConnectionsAreClosed();
 
         //cannot add ranges after stream session is finished
         try
@@ -262,7 +256,6 @@ public class StreamingTransferTest
     {
         StreamPlan streamPlan = new 
StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, 
makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable))));
         streamPlan.execute().get();
-        verifyConnectionsAreClosed();
 
         //cannot add files after stream session is finished
         try
@@ -276,27 +269,6 @@ public class StreamingTransferTest
         }
     }
 
-    /**
-     * Test that finished incoming connections are removed from 
MessagingService (CASSANDRA-11854)
-     */
-    private void verifyConnectionsAreClosed() throws InterruptedException
-    {
-        // TODO:JEB intentionally breaking this with CASSANDRA-8457 until 
CASSANDRA-12229
-        //after stream session is finished, message handlers may take several 
milliseconds to be closed
-//        outer:
-//        for (int i = 0; i <= 100; i++)
-//        {
-//            for (MessagingService.SocketThread socketThread : 
MessagingService.instance().getSocketThreads())
-//                if (!socketThread.connections.isEmpty())
-//                {
-//                    Thread.sleep(100);
-//                    continue outer;
-//                }
-//            return;
-//        }
-//        fail("Streaming connections remain registered in MessagingService");
-    }
-
     private Collection<StreamSession.SSTableStreamingSections> 
makeStreamingDetails(List<Range<Token>> ranges, Refs<SSTableReader> sstables)
     {
         ArrayList<StreamSession.SSTableStreamingSections> details = new 
ArrayList<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
new file mode 100644
index 0000000..a9849a3
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.async.TestScheduledFuture;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.CompleteMessage;
+
+public class NettyStreamingMessageSenderTest
+{
+    private static final InetSocketAddress REMOTE_ADDR = new 
InetSocketAddress("127.0.0.2", 0);
+
+    private EmbeddedChannel channel;
+    private StreamSession session;
+    private NettyStreamingMessageSender sender;
+    private NettyStreamingMessageSender.FileStreamTask fileStreamTask;
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @Before
+    public void setUp()
+    {
+        channel = new EmbeddedChannel();
+        
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
+        UUID pendingRepair = UUID.randomUUID();
+        session = new StreamSession(REMOTE_ADDR.getAddress(), 
REMOTE_ADDR.getAddress(), (connectionId, protocolVersion) -> null, 0, true, 
pendingRepair, PreviewKind.ALL);
+        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, 
UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, 
true, pendingRepair, session.getPreviewKind());
+        session.init(future);
+        sender = session.getMessageSender();
+        sender.setControlMessageChannel(channel);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (fileStreamTask != null)
+            fileStreamTask.unsetChannel();
+    }
+
+    @Test
+    public void KeepAliveTask_normalSend()
+    {
+        Assert.assertTrue(channel.isOpen());
+        NettyStreamingMessageSender.KeepAliveTask task = sender.new 
KeepAliveTask(channel, session);
+        task.run();
+        Assert.assertTrue(channel.releaseOutbound());
+    }
+
+    @Test
+    public void KeepAliveTask_channelClosed()
+    {
+        channel.close();
+        Assert.assertFalse(channel.isOpen());
+        channel.releaseOutbound();
+        NettyStreamingMessageSender.KeepAliveTask task = sender.new 
KeepAliveTask(channel, session);
+        task.future = new TestScheduledFuture();
+        Assert.assertFalse(task.future.isCancelled());
+        task.run();
+        Assert.assertTrue(task.future.isCancelled());
+        Assert.assertFalse(channel.releaseOutbound());
+    }
+
+    @Test
+    public void KeepAliveTask_closed()
+    {
+        Assert.assertTrue(channel.isOpen());
+        NettyStreamingMessageSender.KeepAliveTask task = sender.new 
KeepAliveTask(channel, session);
+        task.future = new TestScheduledFuture();
+        Assert.assertFalse(task.future.isCancelled());
+
+        sender.setClosed();
+        Assert.assertFalse(sender.connected());
+        task.run();
+        Assert.assertTrue(task.future.isCancelled());
+        Assert.assertFalse(channel.releaseOutbound());
+    }
+
+    @Test
+    public void KeepAliveTask_CurrentlyStreaming()
+    {
+        Assert.assertTrue(channel.isOpen());
+        
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.TRUE);
+        NettyStreamingMessageSender.KeepAliveTask task = sender.new 
KeepAliveTask(channel, session);
+        task.future = new TestScheduledFuture();
+        Assert.assertFalse(task.future.isCancelled());
+
+        Assert.assertTrue(sender.connected());
+        task.run();
+        Assert.assertFalse(task.future.isCancelled());
+        Assert.assertFalse(channel.releaseOutbound());
+    }
+
+    @Test
+    public void FileStreamTask_acquirePermit_closed()
+    {
+        fileStreamTask = sender.new FileStreamTask(null);
+        sender.setClosed();
+        Assert.assertFalse(fileStreamTask.acquirePermit(1));
+    }
+
+    @Test
+    public void FileStreamTask_acquirePermit_HapppyPath()
+    {
+        int permits = sender.semaphoreAvailablePermits();
+        fileStreamTask = sender.new FileStreamTask(null);
+        Assert.assertTrue(fileStreamTask.acquirePermit(1));
+        Assert.assertEquals(permits - 1, sender.semaphoreAvailablePermits());
+    }
+
+    @Test
+    public void FileStreamTask_BadChannelAttr()
+    {
+        int permits = sender.semaphoreAvailablePermits();
+        
channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.TRUE);
+        fileStreamTask = sender.new FileStreamTask(null);
+        fileStreamTask.injectChannel(channel);
+        fileStreamTask.run();
+        Assert.assertEquals(StreamSession.State.FAILED, session.state());
+        Assert.assertTrue(channel.releaseOutbound()); // when the session 
fails, it will send a SessionFailed msg
+        Assert.assertEquals(permits, sender.semaphoreAvailablePermits());
+    }
+
+    @Test
+    public void FileStreamTask_HappyPath()
+    {
+        int permits = sender.semaphoreAvailablePermits();
+        fileStreamTask = sender.new FileStreamTask(new CompleteMessage());
+        fileStreamTask.injectChannel(channel);
+        fileStreamTask.run();
+        Assert.assertNotEquals(StreamSession.State.FAILED, session.state());
+        Assert.assertTrue(channel.releaseOutbound());
+        Assert.assertEquals(permits, sender.semaphoreAvailablePermits());
+    }
+
+    @Test
+    public void onControlMessageComplete_HappyPath()
+    {
+        Assert.assertTrue(channel.isOpen());
+        Assert.assertTrue(sender.connected());
+        ChannelPromise promise = channel.newPromise();
+        promise.setSuccess();
+        Assert.assertNull(sender.onControlMessageComplete(promise, new 
CompleteMessage()));
+        Assert.assertTrue(channel.isOpen());
+        Assert.assertTrue(sender.connected());
+        Assert.assertNotEquals(StreamSession.State.FAILED, session.state());
+    }
+
+    @Test
+    public void onControlMessageComplete_Exception() throws 
InterruptedException, ExecutionException, TimeoutException
+    {
+        Assert.assertTrue(channel.isOpen());
+        Assert.assertTrue(sender.connected());
+        ChannelPromise promise = channel.newPromise();
+        promise.setFailure(new RuntimeException("this is just a testing 
exception"));
+        Future f = sender.onControlMessageComplete(promise, new 
CompleteMessage());
+
+        f.get(5, TimeUnit.SECONDS);
+
+        Assert.assertFalse(channel.isOpen());
+        Assert.assertFalse(sender.connected());
+        Assert.assertEquals(StreamSession.State.FAILED, session.state());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
new file mode 100644
index 0000000..fff7b17
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamCompressionSerializerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Random;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+public class StreamCompressionSerializerTest
+{
+    private static final int VERSION = StreamMessage.CURRENT_VERSION;
+    private static final Random random = new Random(2347623847623L);
+
+    private final ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
+    private final StreamCompressionSerializer serializer = new 
StreamCompressionSerializer(allocator);
+    private final LZ4Compressor compressor = 
LZ4Factory.fastestInstance().fastCompressor();
+    private final LZ4FastDecompressor decompressor = 
LZ4Factory.fastestInstance().fastDecompressor();
+
+    private ByteBuffer input;
+    private ByteBuf compressed;
+    private ByteBuf output;
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (input != null)
+            FileUtils.clean(input);
+        if (compressed != null && compressed.refCnt() > 0)
+            compressed.release(compressed.refCnt());
+        if (output != null && output.refCnt() > 0)
+            output.release(output.refCnt());
+    }
+
+    @Test
+    public void roundTrip_HappyPath_NotReadabaleByteBuffer() throws IOException
+    {
+        populateInput();
+        compressed = serializer.serialize(compressor, input, VERSION);
+        input.flip();
+        ByteBuffer compressedNioBuffer = compressed.nioBuffer(0, 
compressed.writerIndex());
+        output = serializer.deserialize(decompressor, new 
DataInputBuffer(compressedNioBuffer, false), VERSION);
+        validateResults();
+    }
+
+    private void populateInput()
+    {
+        int bufSize = 1 << 14;
+        input = ByteBuffer.allocateDirect(bufSize);
+        for (int i = 0; i < bufSize; i += 4)
+            input.putInt(random.nextInt());
+        input.flip();
+    }
+
+    private void validateResults()
+    {
+        Assert.assertEquals(input.remaining(), output.readableBytes());
+        for (int i = 0; i < input.remaining(); i++)
+            Assert.assertEquals(input.get(i), output.readByte());
+    }
+
+    @Test
+    public void roundTrip_HappyPath_ReadabaleByteBuffer() throws IOException
+    {
+        populateInput();
+        compressed = serializer.serialize(compressor, input, VERSION);
+        input.flip();
+        output = serializer.deserialize(decompressor, new 
ByteBufRCH(compressed), VERSION);
+        validateResults();
+    }
+
+    private static class ByteBufRCH extends DataInputBuffer implements 
ReadableByteChannel
+    {
+        public ByteBufRCH(ByteBuf compressed)
+        {
+            super (compressed.nioBuffer(0, compressed.readableBytes()), false);
+        }
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException
+        {
+            int len = dst.remaining();
+            dst.put(buffer);
+            return len;
+        }
+
+        @Override
+        public boolean isOpen()
+        {
+            return true;
+        }
+
+        @Override
+        public void close()
+        {   }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
new file mode 100644
index 0000000..a674e6b
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming.async;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import 
org.apache.cassandra.streaming.async.StreamingInboundHandler.SessionIdentifier;
+import org.apache.cassandra.streaming.messages.CompleteMessage;
+import org.apache.cassandra.streaming.messages.FileMessageHeader;
+import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.StreamInitMessage;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+
+public class StreamingInboundHandlerTest
+{
+    private static final int VERSION = StreamMessage.CURRENT_VERSION;
+    private static final InetSocketAddress REMOTE_ADDR = new 
InetSocketAddress("127.0.0.2", 0);
+
+    private StreamingInboundHandler handler;
+    private EmbeddedChannel channel;
+    private RebufferingByteBufDataInputPlus buffers;
+    private ByteBuf buf;
+
+    @BeforeClass
+    public static void before()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    @Before
+    public void setup()
+    {
+        handler = new StreamingInboundHandler(REMOTE_ADDR, VERSION, null);
+        channel = new EmbeddedChannel(handler);
+        buffers = new RebufferingByteBufDataInputPlus(1 << 9, 1 << 10, 
channel.config());
+        handler.setPendingBuffers(buffers);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (buf != null)
+        {
+            while (buf.refCnt() > 0)
+                buf.release();
+        }
+
+        channel.close();
+    }
+
+    @Test
+    public void channelRead_Normal() throws EOFException
+    {
+        Assert.assertEquals(0, buffers.available());
+        int size = 8;
+        buf = channel.alloc().buffer(size);
+        buf.writerIndex(size);
+        channel.writeInbound(buf);
+        Assert.assertEquals(size, buffers.available());
+        Assert.assertFalse(channel.releaseInbound());
+    }
+
+    @Test (expected = EOFException.class)
+    public void channelRead_Closed() throws EOFException
+    {
+        int size = 8;
+        buf = channel.alloc().buffer(size);
+        Assert.assertEquals(1, buf.refCnt());
+        buf.writerIndex(size);
+        handler.close();
+        channel.writeInbound(buf);
+        Assert.assertEquals(0, buffers.available());
+        Assert.assertEquals(0, buf.refCnt());
+        Assert.assertFalse(channel.releaseInbound());
+    }
+
+    @Test
+    public void channelRead_WrongObject() throws EOFException
+    {
+        channel.writeInbound("homer");
+        Assert.assertEquals(0, buffers.available());
+        Assert.assertFalse(channel.releaseInbound());
+    }
+
+    @Test
+    public void StreamDeserializingTask_deriveSession_StreamInitMessage() 
throws InterruptedException, IOException
+    {
+        StreamInitMessage msg = new 
StreamInitMessage(REMOTE_ADDR.getAddress(), 0, UUID.randomUUID(), 
StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL);
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new 
StreamDeserializingTask(sid -> createSession(sid), null, channel);
+        StreamSession session = task.deriveSession(msg);
+        Assert.assertNotNull(session);
+    }
+
+    private StreamSession createSession(SessionIdentifier sid)
+    {
+        return new StreamSession(sid.from, sid.from, (connectionId, 
protocolVersion) -> null, sid.sessionIndex, true, UUID.randomUUID(), 
PreviewKind.ALL);
+    }
+
+    @Test (expected = IllegalStateException.class)
+    public void StreamDeserializingTask_deriveSession_NoSession() throws 
InterruptedException, IOException
+    {
+        CompleteMessage msg = new CompleteMessage();
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new 
StreamDeserializingTask(sid -> createSession(sid), null, channel);
+        task.deriveSession(msg);
+    }
+
+    @Test (expected = IllegalStateException.class)
+    public void StreamDeserializingTask_deriveSession_IFM_NoSession() throws 
InterruptedException, IOException
+    {
+        FileMessageHeader header = new FileMessageHeader(TableId.generate(), 
REMOTE_ADDR.getAddress(), UUID.randomUUID(), 0, 0,
+                                                         
BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, 
UUID.randomUUID(), 0 , null);
+        IncomingFileMessage msg = new IncomingFileMessage(null, header);
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new 
StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, 
sid.planId, sid.sessionIndex), null, channel);
+        task.deriveSession(msg);
+    }
+
+    @Test
+    public void StreamDeserializingTask_deriveSession_IFM_HasSession() throws 
InterruptedException, IOException
+    {
+        UUID planId = UUID.randomUUID();
+        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, 
planId, StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, 
UUID.randomUUID(), PreviewKind.ALL);
+        StreamManager.instance.register(future);
+        FileMessageHeader header = new FileMessageHeader(TableId.generate(), 
REMOTE_ADDR.getAddress(), planId, 0, 0,
+                                                         
BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, 
UUID.randomUUID(), 0 , null);
+        IncomingFileMessage msg = new IncomingFileMessage(null, header);
+        StreamingInboundHandler.StreamDeserializingTask task = handler.new 
StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, 
sid.planId, sid.sessionIndex), null, channel);
+        StreamSession session = task.deriveSession(msg);
+        Assert.assertNotNull(session);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
index 19e28fd..16b3a76 100644
--- 
a/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
+++ 
b/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
@@ -28,12 +28,11 @@ import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.SequentialWriterOption;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.streaming.compress.CompressedInputStream;
 import org.apache.cassandra.streaming.compress.CompressionInfo;
@@ -174,7 +173,7 @@ public class CompressedInputStreamTest
             testException(sections, info);
             return;
         }
-        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(toRead), info, ChecksumType.CRC32, () -> 1.0);
+        CompressedInputStream input = new CompressedInputStream(new 
DataInputStreamPlus(new ByteArrayInputStream(toRead)), info, 
ChecksumType.CRC32, () -> 1.0);
 
         try (DataInputStream in = new DataInputStream(input))
         {
@@ -189,14 +188,14 @@ public class CompressedInputStreamTest
 
     private static void testException(List<Pair<Long, Long>> sections, 
CompressionInfo info) throws IOException
     {
-        CompressedInputStream input = new CompressedInputStream(new 
ByteArrayInputStream(new byte[0]), info, ChecksumType.CRC32, () -> 1.0);
+        CompressedInputStream input = new CompressedInputStream(new 
DataInputStreamPlus(new ByteArrayInputStream(new byte[0])), info, 
ChecksumType.CRC32, () -> 1.0);
 
         try (DataInputStream in = new DataInputStream(input))
         {
             for (int i = 0; i < sections.size(); i++)
             {
-                input.position(sections.get(i).left);
                 try {
+                    input.position(sections.get(i).left);
                     in.readLong();
                     fail("Should have thrown IOException");
                 }
@@ -208,3 +207,4 @@ public class CompressedInputStreamTest
         }
     }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to