http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
deleted file mode 100644
index c5b0c53..0000000
--- 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * CassandraStreamWriter for compressed SSTable.
- */
-public class CompressedCassandraStreamWriter extends CassandraStreamWriter
-{
-    private static final int CHUNK_SIZE = 1 << 16;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedCassandraStreamWriter.class);
-
-    private final CompressionInfo compressionInfo;
-
-    public CompressedCassandraStreamWriter(SSTableReader sstable, 
Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo 
compressionInfo, StreamSession session)
-    {
-        super(sstable, sections, session);
-        this.compressionInfo = compressionInfo;
-    }
-
-    @Override
-    public void write(DataOutputStreamPlus out) throws IOException
-    {
-        assert out instanceof ByteBufDataOutputStreamPlus;
-        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
-        long totalSize = totalSize();
-        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
-                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
-        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
-        {
-            long progress = 0L;
-            // calculate chunks to transfer. we want to send continuous chunks 
altogether.
-            List<SSTableReader.PartitionPositionBounds> sections = 
getTransferSections(compressionInfo.chunks);
-
-            int sectionIdx = 0;
-
-            // stream each of the required sections of the file
-            for (final SSTableReader.PartitionPositionBounds section : 
sections)
-            {
-                // length of the section to stream
-                long length = section.upperPosition - section.lowerPosition;
-
-                logger.trace("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
-
-                // tracks write progress
-                long bytesTransferred = 0;
-                while (bytesTransferred < length)
-                {
-                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - 
bytesTransferred);
-                    limiter.acquire(toTransfer);
-
-                    ByteBuffer outBuffer = 
ByteBuffer.allocateDirect(toTransfer);
-                    long lastWrite;
-                    try
-                    {
-                        lastWrite = fc.read(outBuffer, section.lowerPosition + 
bytesTransferred);
-                        assert lastWrite == toTransfer : String.format("could 
not read required number of bytes from file to be streamed: read %d bytes, 
wanted %d bytes", lastWrite, toTransfer);
-                        outBuffer.flip();
-                        output.writeToChannel(outBuffer);
-                    }
-                    catch (IOException e)
-                    {
-                        FileUtils.clean(outBuffer);
-                        throw e;
-                    }
-
-                    bytesTransferred += lastWrite;
-                    progress += lastWrite;
-                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
-                         session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-
-    // chunks are assumed to be sorted by offset
-    private List<SSTableReader.PartitionPositionBounds> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
-    {
-        List<SSTableReader.PartitionPositionBounds> transferSections = new 
ArrayList<>();
-        SSTableReader.PartitionPositionBounds lastSection = null;
-        for (CompressionMetadata.Chunk chunk : chunks)
-        {
-            if (lastSection != null)
-            {
-                if (chunk.offset == lastSection.upperPosition)
-                {
-                    // extend previous section to end of this chunk
-                    lastSection = new 
SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
-                }
-                else
-                {
-                    transferSections.add(lastSection);
-                    lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
-                }
-            }
-            else
-            {
-                lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
-            }
-        }
-        if (lastSection != null)
-            transferSections.add(lastSection);
-        return transferSections;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java 
b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
index 2f56786..c0278e8 100644
--- a/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/db/streaming/CompressedInputStream.java
@@ -60,7 +60,7 @@ public class CompressedInputStream extends 
RebufferingInputStream implements Aut
     private long bufferOffset = 0;
 
     /**
-     * The current {@link CompressedCassandraStreamReader#sections} offset in 
the stream.
+     * The current {@link CassandraCompressedStreamReader#sections} offset in 
the stream.
      */
     private long current = 0;
 
@@ -98,7 +98,7 @@ public class CompressedInputStream extends 
RebufferingInputStream implements Aut
     }
 
     /**
-     * Invoked when crossing into the next stream boundary in {@link 
CompressedCassandraStreamReader#sections}.
+     * Invoked when crossing into the next stream boundary in {@link 
CassandraCompressedStreamReader#sections}.
      */
     public void position(long position) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/IStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
new file mode 100644
index 0000000..cf93bc2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/IStreamReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+
+/**
+ * This is the interface is used by the streaming code read a SSTable stream 
off a channel.
+ */
+public interface IStreamReader
+{
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/Component.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Component.java 
b/src/java/org/apache/cassandra/io/sstable/Component.java
index 9daac7c..a81db85 100644
--- a/src/java/org/apache/cassandra/io/sstable/Component.java
+++ b/src/java/org/apache/cassandra/io/sstable/Component.java
@@ -33,6 +33,10 @@ public class Component
 
     final static EnumSet<Type> TYPES = EnumSet.allOf(Type.class);
 
+    /**
+     * WARNING: Be careful while changing the names or string representation 
of the enum
+     * members. Streaming code depends on the names during streaming (Ref: 
CASSANDRA-14556).
+     */
     public enum Type
     {
         // the base data for an sstable: the remaining components can be 
regenerated
@@ -60,6 +64,7 @@ public class Component
         CUSTOM(null);
 
         final String repr;
+
         Type(String repr)
         {
             this.repr = repr;
@@ -120,7 +125,7 @@ public class Component
      * @return the component corresponding to {@code name}. Note that this 
always return a component as an unrecognized
      * name is parsed into a CUSTOM component.
      */
-    static Component parse(String name)
+    public static Component parse(String name)
     {
         Type type = Type.fromRepresentation(name);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index ebc35e7..4ba0533 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -131,7 +131,7 @@ public class SSTableLoader implements StreamEventHandler
                                                   
List<SSTableReader.PartitionPositionBounds> sstableSections = 
sstable.getPositionsForRanges(tokenRanges);
                                                   long estimatedKeys = 
sstable.estimatedKeysForRanges(tokenRanges);
                                                   Ref<SSTableReader> ref = 
sstable.ref();
-                                                  OutgoingStream stream = new 
CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, 
estimatedKeys);
+                                                  OutgoingStream stream = new 
CassandraOutgoingFile(StreamOperation.BULK_LOAD, ref, sstableSections, 
tokenRanges, estimatedKeys);
                                                   
streamingDetails.put(endpoint, stream);
                                               }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
new file mode 100644
index 0000000..400f119
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableZeroCopyWriter.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+public class BigTableZeroCopyWriter extends SSTable implements 
SSTableMultiWriter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(BigTableZeroCopyWriter.class);
+
+    private final TableMetadataRef metadata;
+    private volatile SSTableReader finalReader;
+    private final Map<Component.Type, SequentialWriter> componentWriters;
+
+    private static final SequentialWriterOption WRITER_OPTION =
+        SequentialWriterOption.newBuilder()
+                              .trickleFsync(false)
+                              .bufferSize(2 << 20)
+                              .bufferType(BufferType.OFF_HEAP)
+                              .build();
+
+    private static final ImmutableSet<Component> SUPPORTED_COMPONENTS =
+        ImmutableSet.of(Component.DATA,
+                        Component.PRIMARY_INDEX,
+                        Component.SUMMARY,
+                        Component.STATS,
+                        Component.COMPRESSION_INFO,
+                        Component.FILTER,
+                        Component.DIGEST,
+                        Component.CRC);
+
+    public BigTableZeroCopyWriter(Descriptor descriptor,
+                                  TableMetadataRef metadata,
+                                  LifecycleTransaction txn,
+                                  final Collection<Component> components)
+    {
+        super(descriptor, ImmutableSet.copyOf(components), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
+
+        txn.trackNew(this);
+        this.metadata = metadata;
+        this.componentWriters = new EnumMap<>(Component.Type.class);
+
+        if (!SUPPORTED_COMPONENTS.containsAll(components))
+            throw new AssertionError(format("Unsupported streaming component 
detected %s",
+                                            
Sets.difference(ImmutableSet.copyOf(components), SUPPORTED_COMPONENTS)));
+
+        for (Component c : components)
+            componentWriters.put(c.type, makeWriter(descriptor, c));
+    }
+
+    private static SequentialWriter makeWriter(Descriptor descriptor, 
Component component)
+    {
+        return new SequentialWriter(new 
File(descriptor.filenameFor(component)), WRITER_OPTION, false);
+    }
+
+    private void write(DataInputPlus in, long size, SequentialWriter out) 
throws FSWriteError
+    {
+        final int BUFFER_SIZE = 1 << 20;
+        long bytesRead = 0;
+        byte[] buff = new byte[BUFFER_SIZE];
+        try
+        {
+            while (bytesRead < size)
+            {
+                int toRead = (int) Math.min(size - bytesRead, BUFFER_SIZE);
+                in.readFully(buff, 0, toRead);
+                int count = Math.min(toRead, BUFFER_SIZE);
+                out.write(buff, 0, count);
+                bytesRead += count;
+            }
+            out.sync();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, out.getPath());
+        }
+    }
+
+    @Override
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        throw new UnsupportedOperationException("Operation not supported by 
BigTableBlockWriter");
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, 
boolean openResult)
+    {
+        return finish(openResult);
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        setOpenResult(openResult);
+        return finished();
+    }
+
+    @Override
+    public Collection<SSTableReader> finished()
+    {
+        if (finalReader == null)
+            finalReader = SSTableReader.open(descriptor, components, metadata);
+
+        return ImmutableList.of(finalReader);
+    }
+
+    @Override
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        return null;
+    }
+
+    @Override
+    public long getFilePointer()
+    {
+        return 0;
+    }
+
+    @Override
+    public TableId getTableId()
+    {
+        return metadata.id;
+    }
+
+    @Override
+    public Throwable commit(Throwable accumulate)
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            accumulate = writer.commit(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public Throwable abort(Throwable accumulate)
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            accumulate = writer.abort(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public void prepareToCommit()
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            writer.prepareToCommit();
+    }
+
+    @Override
+    public void close()
+    {
+        for (SequentialWriter writer : componentWriters.values())
+            writer.close();
+    }
+
+    public void writeComponent(Component.Type type, DataInputPlus in, long 
size)
+    {
+        logger.info("Writing component {} to {} length {}", type, 
componentWriters.get(type).getPath(), prettyPrintMemory(size));
+
+        if (in instanceof RebufferingByteBufDataInputPlus)
+            write((RebufferingByteBufDataInputPlus) in, size, 
componentWriters.get(type));
+        else
+            write(in, size, componentWriters.get(type));
+    }
+
+    private void write(RebufferingByteBufDataInputPlus in, long size, 
SequentialWriter writer)
+    {
+        logger.info("Block Writing component to {} length {}", 
writer.getPath(), prettyPrintMemory(size));
+
+        try
+        {
+            long bytesWritten = in.consumeUntil(writer, size);
+
+            if (bytesWritten != size)
+                throw new IOException(format("Failed to read correct number of 
bytes from channel %s", writer));
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, writer.getPath());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
index 54122ee..56d88f7 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java
@@ -24,11 +24,9 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 
 import net.nicoulaj.compilecommand.annotations.DontInline;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.vint.VIntCoding;
@@ -341,7 +339,7 @@ public class BufferedDataOutputStreamPlus extends 
DataOutputStreamPlus
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws 
IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, 
IOException> f) throws IOException
     {
         if (strictFlushing)
             throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CheckedFunction.java 
b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
new file mode 100644
index 0000000..ec1ce9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/util/CheckedFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+@FunctionalInterface
+public interface CheckedFunction<T, R, E extends Exception>
+{
+    R apply(T t) throws E;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java 
b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
index a9dbb68..16be42f 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java
@@ -22,8 +22,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
-
 import org.apache.cassandra.utils.vint.VIntCoding;
 
 /**
@@ -41,7 +39,7 @@ public interface DataOutputPlus extends DataOutput
      * Safe way to operate against the underlying channel. Impossible to stash 
a reference to the channel
      * and forget to flush
      */
-    <R> R applyToChannel(Function<WritableByteChannel, R> c) throws 
IOException;
+    <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, IOException> 
c) throws IOException;
 
     default void writeVInt(long i) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java 
b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 086f5c9..ef51888 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -25,12 +25,12 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import com.google.common.base.Preconditions;
+
 import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Rough equivalent of BufferedInputStream and DataInputStream wrapping a 
ByteBuffer that can be refilled
  * via rebuffer. Implementations provide this buffer from various channels 
(socket, file, memory, etc).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java 
b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index e71f2fa..3eb1a7d 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -138,11 +138,22 @@ public class SequentialWriter extends 
BufferedDataOutputStreamPlus implements Tr
      */
     public SequentialWriter(File file, SequentialWriterOption option)
     {
+        this(file, option, true);
+    }
+
+    /**
+     * Create SequentialWriter for given file with specific writer option.
+     * @param file
+     * @param option
+     * @param strictFlushing
+     */
+    public SequentialWriter(File file, SequentialWriterOption option, boolean 
strictFlushing)
+    {
         super(openChannel(file), option.allocateBuffer());
-        strictFlushing = true;
-        fchannel = (FileChannel)channel;
+        this.strictFlushing = strictFlushing;
+        this.fchannel = (FileChannel)channel;
 
-        filePath = file.getAbsolutePath();
+        this.filePath = file.getAbsolutePath();
 
         this.option = option;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
index 54b4cb1..d9ef010 100644
--- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java
@@ -378,7 +378,7 @@ public abstract class UnbufferedDataOutputStreamPlus 
extends DataOutputStreamPlu
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws 
IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, 
IOException> f) throws IOException
     {
         return f.apply(channel);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
index 0473465..a77cb07 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputPlus.java
@@ -22,10 +22,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
-import com.google.common.base.Function;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
+import org.apache.cassandra.io.util.CheckedFunction;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
@@ -82,7 +81,7 @@ public class ByteBufDataOutputPlus extends 
ByteBufOutputStream implements DataOu
     }
 
     @Override
-    public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws 
IOException
+    public <R> R applyToChannel(CheckedFunction<WritableByteChannel, R, 
IOException> c) throws IOException
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java 
b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
index 3a544e4..777bc3e 100644
--- a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
+++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.net.async;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -35,6 +38,7 @@ import io.netty.util.concurrent.Future;
 import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
 import org.apache.cassandra.streaming.StreamSession;
 
 /**
@@ -49,6 +53,7 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
     private final StreamSession session;
     private final Channel channel;
     private final int bufferSize;
+    private final Logger logger = 
LoggerFactory.getLogger(ByteBufDataOutputStreamPlus.class);
 
     /**
      * Tracks how many bytes we've written to the netty channel. This more or 
less follows the channel's
@@ -70,7 +75,6 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
         this.channel = channel;
         this.currentBuf = buffer;
         this.bufferSize = bufferSize;
-
         channelRateLimiter = new 
Semaphore(channel.config().getWriteBufferHighWaterMark(), true);
     }
 
@@ -114,8 +118,9 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
         doFlush(buffer.position());
 
         int byteCount = buf.readableBytes();
+
         if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, 
byteCount, 5, TimeUnit.MINUTES))
-            throw new IOException("outbound channel was not writable");
+            throw new IOException(String.format("outbound channel was not 
writable. Failed to acquire sufficient permits %d", byteCount));
 
         // the (possibly naive) assumption that we should always flush after 
each incoming buf
         ChannelFuture channelFuture = channel.writeAndFlush(buf);
@@ -135,6 +140,53 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
         return channelFuture;
     }
 
+    /**
+     * Writes all data in file channel to stream BUFFER_SIZE at a time.
+     * Closes file channel when done
+     *
+     * @param f
+     * @return number of bytes transferred
+     * @throws IOException
+     */
+    public long writeToChannel(FileChannel f, StreamRateLimiter limiter) 
throws IOException
+    {
+        final long length = f.size();
+        long bytesTransferred = 0;
+
+        try
+        {
+            while (bytesTransferred < length)
+            {
+                int toRead = (int) Math.min(bufferSize, length - 
bytesTransferred);
+                NonClosingDefaultFileRegion fileRegion = new 
NonClosingDefaultFileRegion(f, bytesTransferred, toRead);
+
+                if 
(!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, toRead, 5, 
TimeUnit.MINUTES))
+                    throw new IOException(String.format("outbound channel was 
not writable. Failed to acquire sufficient permits %d", toRead));
+
+                limiter.acquire(toRead);
+
+                bytesTransferred += toRead;
+                final boolean shouldClose = (bytesTransferred == length); // 
this is the last buffer, can safely close channel
+
+                channel.writeAndFlush(fileRegion).addListener(future -> {
+                    handleBuffer(future, toRead);
+
+                    if ((shouldClose || !future.isSuccess()) && f.isOpen())
+                        f.close();
+                });
+                logger.trace("{} of {} (toRead {} cs {})", bytesTransferred, 
length, toRead, f.isOpen());
+            }
+
+            return bytesTransferred;
+        } catch (Exception e)
+        {
+            if (f.isOpen())
+                f.close();
+
+            throw e;
+        }
+    }
+
     @Override
     protected void doFlush(int count) throws IOException
     {
@@ -145,7 +197,7 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
             currentBuf.writerIndex(byteCount);
 
             if 
(!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, 
TimeUnit.MINUTES))
-                throw new IOException("outbound channel was not writable");
+                throw new IOException(String.format("outbound channel was not 
writable. Failed to acquire sufficient permits %d", byteCount));
 
             channel.writeAndFlush(currentBuf).addListener(future -> 
handleBuffer(future, byteCount));
             currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize);
@@ -161,7 +213,7 @@ public class ByteBufDataOutputStreamPlus extends 
BufferedDataOutputStreamPlus
     private void handleBuffer(Future<? super Void> future, int bytesWritten)
     {
         channelRateLimiter.release(bytesWritten);
-
+        logger.trace("bytesWritten {} {} because {}", bytesWritten, 
(future.isSuccess() == true) ? "Succeeded" : "Failed", future.cause());
         if (!future.isSuccess() && channel.isOpen())
             session.onError(future.cause());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java 
b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
new file mode 100644
index 0000000..46f0ce1
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/async/NonClosingDefaultFileRegion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net.async;
+
+import java.io.File;
+import java.nio.channels.FileChannel;
+
+import io.netty.channel.DefaultFileRegion;
+
+/**
+ * Netty's DefaultFileRegion closes the underlying FileChannel as soon as
+ * the refCnt() for the region drops to zero, this is an implementation of
+ * the DefaultFileRegion that doesn't close the FileChannel.
+ *
+ * See {@link ByteBufDataOutputStreamPlus} for its usage.
+ */
+public class NonClosingDefaultFileRegion extends DefaultFileRegion
+{
+
+    public NonClosingDefaultFileRegion(FileChannel file, long position, long 
count)
+    {
+        super(file, position, count);
+    }
+
+    public NonClosingDefaultFileRegion(File f, long position, long count)
+    {
+        super(f, position, count);
+    }
+
+    @Override
+    protected void deallocate()
+    {
+        // Overridden to avoid closing the file
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java 
b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
index 1f32aa8..4e667da 100644
--- 
a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
+++ 
b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java
@@ -31,6 +31,7 @@ import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelConfig;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 
 public class RebufferingByteBufDataInputPlus extends RebufferingInputStream 
implements ReadableByteChannel
@@ -249,4 +250,42 @@ public class RebufferingByteBufDataInputPlus extends 
RebufferingInputStream impl
     {
         return channelConfig.getAllocator();
     }
+
+    /**
+     * Consumes bytes in the stream until the given length
+     *
+     * @param writer
+     * @param len
+     * @return
+     * @throws IOException
+     */
+    public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) 
throws IOException
+    {
+        long copied = 0; // number of bytes copied
+        while (copied < len)
+        {
+            if (buffer.remaining() == 0)
+            {
+                try
+                {
+                    reBuffer();
+                }
+                catch (EOFException e)
+                {
+                    throw new EOFException("EOF after " + copied + " bytes out 
of " + len);
+                }
+                if (buffer.remaining() == 0 && copied < len)
+                    throw new AssertionError("reBuffer() failed to return 
data");
+            }
+
+            int originalLimit = buffer.limit();
+            int toCopy = (int) Math.min(len - copied, buffer.remaining());
+            buffer.limit(buffer.position() + toCopy);
+            int written = writer.applyToChannel(c -> c.write(buffer));
+            buffer.limit(originalLimit);
+            copied += written;
+        }
+
+        return copied;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 34b0bbd..7eada28 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -282,7 +282,8 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(streamOperation, 
peer, factory, streamSessions.size(), pendingRepair, previewKind);
+                StreamSession session = new StreamSession(streamOperation, 
peer, factory, streamSessions.size(),
+                                                          pendingRepair, 
previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 49beba1..0a96f4c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -49,6 +49,7 @@ public class StreamReceiveTask extends StreamTask
     private volatile boolean done = false;
 
     private int remoteStreamsReceived = 0;
+    private long bytesReceived = 0;
 
     public StreamReceiveTask(StreamSession session, TableId tableId, int 
totalStreams, long totalSize)
     {
@@ -76,8 +77,10 @@ public class StreamReceiveTask extends StreamTask
         }
 
         remoteStreamsReceived++;
+        bytesReceived += stream.getSize();
         Preconditions.checkArgument(tableId.equals(stream.getTableId()));
-        logger.debug("recevied {} of {} total files", remoteStreamsReceived, 
totalStreams);
+        logger.debug("received {} of {} total files {} of total bytes {}", 
remoteStreamsReceived, totalStreams,
+                     bytesReceived, totalSize);
 
         receiver.received(stream);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ef8976d..4de63be 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,10 +17,8 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.*;
+import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import com.google.common.util.concurrent.AbstractFuture;
@@ -78,7 +76,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         this(planId, streamOperation, new StreamCoordinator(streamOperation, 
0, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
     }
 
-    static StreamResultFuture init(UUID planId, StreamOperation 
streamOperation, Collection<StreamEventHandler> listeners,
+    public static StreamResultFuture init(UUID planId, StreamOperation 
streamOperation, Collection<StreamEventHandler> listeners,
                                    StreamCoordinator coordinator)
     {
         StreamResultFuture future = createAndRegister(planId, streamOperation, 
coordinator);
@@ -112,8 +110,9 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} 
from {} channel.remote {} channel.local {} channel.id {}",
-                        planId, sessionIndex, 
streamOperation.getDescription(), from, channel.remoteAddress(), 
channel.localAddress(), channel.id());
+            logger.info("[Stream #{} ID#{}] Creating new streaming plan for {} 
from {} channel.remote {} channel.local {}" +
+                        " channel.id {}", planId, sessionIndex, 
streamOperation.getDescription(),
+                        from, channel.remoteAddress(), channel.localAddress(), 
channel.id());
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
             future = new StreamResultFuture(planId, streamOperation, 
pendingRepair, previewKind);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index c56616e..393cd24 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -202,7 +202,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
 
-        logger.debug("Creating stream session peer={} 
preferredPeerInetAddressAndPort={}", peer, preferredPeerInetAddressAndPort);
+        logger.debug("Creating stream session peer={} 
preferredPeerInetAddressAndPort={}", peer,
+                     preferredPeerInetAddressAndPort);
     }
 
     public UUID planId()
@@ -777,7 +778,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         FBUtilities.waitOnFutures(flushes);
     }
 
-    private synchronized void prepareReceiving(StreamSummary summary)
+    @VisibleForTesting
+    public synchronized void prepareReceiving(StreamSummary summary)
     {
         failIfFinished();
         if (summary.files > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index bff77cf..3fa80f5 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -322,7 +322,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
                     throw new IllegalStateException("channel's transferring 
state is currently set to true. refusing to start new stream");
 
                 // close the DataOutputStreamPlus as we're done with it - but 
don't close the channel
-                try (DataOutputStreamPlus outPlus = 
ByteBufDataOutputStreamPlus.create(session, channel, 1 << 16))
+                try (DataOutputStreamPlus outPlus = 
ByteBufDataOutputStreamPlus.create(session, channel, 1 << 20))
                 {
                     StreamMessage.serialize(msg, outPlus, protocolVersion, 
session);
                     channel.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java 
b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index 81fe8cd..7c10ef9 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -62,7 +62,7 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
     static final Function<SessionIdentifier, StreamSession> 
DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, 
sid.planId, sid.sessionIndex);
 
     private static final int AUTO_READ_LOW_WATER_MARK = 1 << 15;
-    private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 16;
+    private static final int AUTO_READ_HIGH_WATER_MARK = 1 << 20;
 
     private final InetAddressAndPort remoteAddress;
     private final int protocolVersion;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index fbd3e21..a591a43 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -47,7 +47,8 @@ public class StreamInitMessage extends StreamMessage
     public final UUID pendingRepair;
     public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID 
planId, StreamOperation streamOperation, UUID pendingRepair, PreviewKind 
previewKind)
+    public StreamInitMessage(InetAddressAndPort from, int sessionIndex, UUID 
planId, StreamOperation streamOperation,
+                             UUID pendingRepair, PreviewKind previewKind)
     {
         super(Type.STREAM_INIT);
         this.from = from;
@@ -93,7 +94,8 @@ public class StreamInitMessage extends StreamMessage
 
             UUID pendingRepair = in.readBoolean() ? 
UUIDSerializer.serializer.deserialize(in, version) : null;
             PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
-            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description), pendingRepair, previewKind);
+            return new StreamInitMessage(from, sessionIndex, planId, 
StreamOperation.fromString(description),
+                                         pendingRepair, previewKind);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -108,6 +110,7 @@ public class StreamInitMessage extends StreamMessage
                 size += 
UUIDSerializer.serializer.serializedSize(message.pendingRepair, 
MessagingService.current_version);
             }
             size += 
TypeSizes.sizeof(message.previewKind.getSerializationVal());
+
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/utils/Collectors3.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Collectors3.java 
b/src/java/org/apache/cassandra/utils/Collectors3.java
new file mode 100644
index 0000000..f8f262e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Collectors3.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collector;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Some extra Collector implementations.
+ *
+ * Named Collectors3 just in case Guava ever makes a Collectors2
+ */
+public class Collectors3
+{
+    private static final Collector.Characteristics[] LIST_CHARACTERISTICS = 
new Collector.Characteristics[] { };
+    public static <T>  Collector<T, ?, List<T>> toImmutableList()
+    {
+        return Collector.of(ImmutableList.Builder<T>::new,
+                            ImmutableList.Builder<T>::add,
+                            (l, r) -> l.addAll(r.build()),
+                            ImmutableList.Builder<T>::build,
+                            LIST_CHARACTERISTICS);
+    }
+
+    private static final Collector.Characteristics[] SET_CHARACTERISTICS = new 
Collector.Characteristics[] { Collector.Characteristics.UNORDERED };
+    public static <T>  Collector<T, ?, Set<T>> toImmutableSet()
+    {
+        return Collector.of(ImmutableSet.Builder<T>::new,
+                            ImmutableSet.Builder<T>::add,
+                            (l, r) -> l.addAll(r.build()),
+                            ImmutableSet.Builder<T>::build,
+                            SET_CHARACTERISTICS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 5893bab..3c09637 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -46,3 +46,5 @@ enable_user_defined_functions: true
 enable_scripted_user_defined_functions: true
 prepared_statements_cache_size_mb: 1
 corrupted_tombstone_strategy: exception
+stream_entire_sstables: true
+stream_throughput_outbound_megabits_per_sec: 200000000

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java 
b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
index bd7ef20..01e67f0 100644
--- a/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
+++ b/test/long/org/apache/cassandra/streaming/LongStreamingTest.java
@@ -29,10 +29,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.Keyspace;
@@ -41,6 +38,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadataRef;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
----------------------------------------------------------------------
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
new file mode 100644
index 0000000..3192bcc
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/ZeroCopyStreamingBenchmark.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamReader;
+import org.apache.cassandra.db.streaming.CassandraEntireSSTableStreamWriter;
+import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
+import org.apache.cassandra.db.streaming.CassandraStreamHeader;
+import org.apache.cassandra.db.streaming.CassandraStreamReader;
+import org.apache.cassandra.db.streaming.CassandraStreamWriter;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * Please ensure that this benchmark is run with 
stream_throughput_outbound_megabits_per_sec set to a
+ * really high value otherwise, throttling will kick in and the results will 
not be meaningful.
+ */
+@Warmup(iterations = 1, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+public class ZeroCopyStreamingBenchmark
+{
+    static final int STREAM_SIZE = 50 * 1024 * 1024;
+
+    @State(Scope.Thread)
+    public static class BenchmarkState
+    {
+        public static final String KEYSPACE = "ZeroCopyStreamingBenchmark";
+        public static final String CF_STANDARD = "Standard1";
+        public static final String CF_INDEXED = "Indexed1";
+        public static final String CF_STANDARDLOWINDEXINTERVAL = 
"StandardLowIndexInterval";
+
+        private static SSTableReader sstable;
+        private static ColumnFamilyStore store;
+        private StreamSession session;
+        private CassandraEntireSSTableStreamWriter blockStreamWriter;
+        private ByteBuf serializedBlockStream;
+        private InetAddressAndPort peer = 
FBUtilities.getBroadcastAddressAndPort();
+        private CassandraEntireSSTableStreamReader blockStreamReader;
+        private CassandraStreamWriter partialStreamWriter;
+        private CassandraStreamReader partialStreamReader;
+        private ByteBuf serializedPartialStream;
+
+        @Setup
+        public void setupBenchmark() throws IOException
+        {
+            Keyspace keyspace = setupSchemaAndKeySpace();
+            store = keyspace.getColumnFamilyStore("Standard1");
+            generateData();
+
+            sstable = store.getLiveSSTables().iterator().next();
+            session = setupStreamingSessionForTest();
+            blockStreamWriter = new 
CassandraEntireSSTableStreamWriter(sstable, session, 
CassandraOutgoingFile.getComponentManifest(sstable));
+
+            CapturingNettyChannel blockStreamCaptureChannel = new 
CapturingNettyChannel(STREAM_SIZE);
+            ByteBufDataOutputStreamPlus out = 
ByteBufDataOutputStreamPlus.create(session, blockStreamCaptureChannel, 1024 * 
1024);
+            blockStreamWriter.write(out);
+            serializedBlockStream = 
blockStreamCaptureChannel.getSerializedStream();
+            out.close();
+
+            session.prepareReceiving(new StreamSummary(sstable.metadata().id, 
1, serializedBlockStream.readableBytes()));
+
+            CassandraStreamHeader entireSSTableStreamHeader =
+                CassandraStreamHeader.builder()
+                                     
.withSSTableFormat(sstable.descriptor.formatType)
+                                     
.withSSTableVersion(sstable.descriptor.version)
+                                     .withSSTableLevel(0)
+                                     
.withEstimatedKeys(sstable.estimatedKeys())
+                                     .withSections(Collections.emptyList())
+                                     
.withSerializationHeader(sstable.header.toComponent())
+                                     
.withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                     .isEntireSSTable(true)
+                                     .withFirstKey(sstable.first)
+                                     .withTableId(sstable.metadata().id)
+                                     .build();
+
+            blockStreamReader = new CassandraEntireSSTableStreamReader(new 
StreamMessageHeader(sstable.metadata().id,
+                                                                               
                peer, session.planId(),
+                                                                               
                0, 0, 0,
+                                                                               
                null), entireSSTableStreamHeader, session);
+
+            List<Range<Token>> requestedRanges = Arrays.asList(new 
Range<>(sstable.first.minValue().getToken(), sstable.last.getToken()));
+            partialStreamWriter = new CassandraStreamWriter(sstable, 
sstable.getPositionsForRanges(requestedRanges), session);
+
+            CapturingNettyChannel partialStreamChannel = new 
CapturingNettyChannel(STREAM_SIZE);
+            
partialStreamWriter.write(ByteBufDataOutputStreamPlus.create(session, 
partialStreamChannel, 1024 * 1024));
+            serializedPartialStream = 
partialStreamChannel.getSerializedStream();
+
+            CassandraStreamHeader partialSSTableStreamHeader =
+                CassandraStreamHeader.builder()
+                                     
.withSSTableFormat(sstable.descriptor.formatType)
+                                     
.withSSTableVersion(sstable.descriptor.version)
+                                     .withSSTableLevel(0)
+                                     
.withEstimatedKeys(sstable.estimatedKeys())
+                                     
.withSections(sstable.getPositionsForRanges(requestedRanges))
+                                     
.withSerializationHeader(sstable.header.toComponent())
+                                     .withTableId(sstable.metadata().id)
+                                     .build();
+
+            partialStreamReader = new CassandraStreamReader(new 
StreamMessageHeader(sstable.metadata().id,
+                                                                               
     peer, session.planId(),
+                                                                               
     0, 0, 0,
+                                                                               
     null),
+                                                            
partialSSTableStreamHeader, session);
+        }
+
+        private Keyspace setupSchemaAndKeySpace()
+        {
+            SchemaLoader.prepareServer();
+            SchemaLoader.createKeyspace(KEYSPACE,
+                                        KeyspaceParams.simple(1),
+                                        SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARD),
+                                        
SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEXED, true),
+                                        SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARDLOWINDEXINTERVAL)
+                                                    .minIndexInterval(8)
+                                                    .maxIndexInterval(256)
+                                                    
.caching(CachingParams.CACHE_NOTHING));
+
+            return Keyspace.open(KEYSPACE);
+        }
+
+        private void generateData()
+        {
+            // insert data and compact to a single sstable
+            CompactionManager.instance.disableAutoCompaction();
+            for (int j = 0; j < 1_000_000; j++)
+            {
+                new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+                .clustering("0")
+                .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+                .build()
+                .applyUnsafe();
+            }
+            store.forceBlockingFlush();
+            CompactionManager.instance.performMaximal(store, false);
+        }
+
+        @TearDown
+        public void tearDown() throws IOException
+        {
+            SchemaLoader.cleanupAndLeaveDirs();
+            CommitLog.instance.stopUnsafe(true);
+        }
+
+        private StreamSession setupStreamingSessionForTest()
+        {
+            StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
+            StreamResultFuture future = 
StreamResultFuture.init(UUID.randomUUID(), StreamOperation.BOOTSTRAP, 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+
+            InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+            streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
+
+            StreamSession session = 
streamCoordinator.getOrCreateNextSession(peer);
+            session.init(future);
+            return session;
+        }
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void blockStreamWriter(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        ByteBufDataOutputStreamPlus out = 
ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+        state.blockStreamWriter.write(out);
+        out.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void blockStreamReader(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        RebufferingByteBufDataInputPlus in = new 
RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+        in.append(state.serializedBlockStream.retainedDuplicate());
+        SSTableMultiWriter sstableWriter = state.blockStreamReader.read(in);
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+        in.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void partialStreamWriter(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        ByteBufDataOutputStreamPlus out = 
ByteBufDataOutputStreamPlus.create(state.session, channel, 1024 * 1024);
+        state.partialStreamWriter.write(out);
+        out.close();
+        channel.finishAndReleaseAll();
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.Throughput)
+    public void partialStreamReader(BenchmarkState state) throws Exception
+    {
+        EmbeddedChannel channel = createMockNettyChannel();
+        RebufferingByteBufDataInputPlus in = new 
RebufferingByteBufDataInputPlus(STREAM_SIZE, STREAM_SIZE, channel.config());
+        in.append(state.serializedPartialStream.retainedDuplicate());
+        SSTableMultiWriter sstableWriter = state.partialStreamReader.read(in);
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+        in.close();
+        channel.finishAndReleaseAll();
+    }
+
+    private EmbeddedChannel createMockNettyChannel()
+    {
+        EmbeddedChannel channel = new EmbeddedChannel();
+        channel.config().setWriteBufferHighWaterMark(STREAM_SIZE); // avoid 
blocking
+        return channel;
+    }
+
+    private static class CapturingNettyChannel extends EmbeddedChannel
+    {
+        private final ByteBuf serializedStream;
+        private final WritableByteChannel proxyWBC = new WritableByteChannel()
+        {
+            public int write(ByteBuffer src) throws IOException
+            {
+                int rem = src.remaining();
+                serializedStream.writeBytes(src);
+                return rem;
+            }
+
+            public boolean isOpen()
+            {
+                return true;
+            }
+
+            public void close() throws IOException
+            {
+            }
+        };
+
+        public CapturingNettyChannel(int capacity)
+        {
+            this.serializedStream = alloc().buffer(capacity);
+            this.pipeline().addLast(new ChannelOutboundHandlerAdapter()
+            {
+                @Override
+                public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception
+                {
+                    if (msg instanceof ByteBuf)
+                        serializedStream.writeBytes((ByteBuf) msg);
+                    else if (msg instanceof ByteBuffer)
+                        serializedStream.writeBytes((ByteBuffer) msg);
+                    else if (msg instanceof DefaultFileRegion)
+                        ((DefaultFileRegion) msg).transferTo(proxyWBC, 0);
+                }
+            });
+            config().setWriteBufferHighWaterMark(capacity);
+        }
+
+        public ByteBuf getSerializedStream()
+        {
+            return serializedStream.copy();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/VerifyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/VerifyTest.java 
b/test/unit/org/apache/cassandra/db/VerifyTest.java
index c9dbe14..0632274 100644
--- a/test/unit/org/apache/cassandra/db/VerifyTest.java
+++ b/test/unit/org/apache/cassandra/db/VerifyTest.java
@@ -612,15 +612,15 @@ public class VerifyTest
 
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
 
-        roh.check(dk(1));
-        roh.check(dk(10));
-        roh.check(dk(11));
-        roh.check(dk(21));
-        roh.check(dk(25));
+        roh.validate(dk(1));
+        roh.validate(dk(10));
+        roh.validate(dk(11));
+        roh.validate(dk(21));
+        roh.validate(dk(25));
         boolean gotException = false;
         try
         {
-            roh.check(dk(26));
+            roh.validate(dk(26));
         }
         catch (Throwable t)
         {
@@ -635,9 +635,9 @@ public class VerifyTest
         List<Range<Token>> normalized = new ArrayList<>();
         normalized.add(r(0,10));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(1));
+        roh.validate(dk(1));
         // call with smaller token to get exception
-        roh.check(dk(0));
+        roh.validate(dk(0));
     }
 
 
@@ -646,9 +646,9 @@ public class VerifyTest
     {
         List<Range<Token>> normalized = 
Range.normalize(Collections.singletonList(r(0,0)));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(Long.MIN_VALUE));
-        roh.check(dk(0));
-        roh.check(dk(Long.MAX_VALUE));
+        roh.validate(dk(Long.MIN_VALUE));
+        roh.validate(dk(0));
+        roh.validate(dk(Long.MAX_VALUE));
     }
 
     @Test
@@ -656,12 +656,12 @@ public class VerifyTest
     {
         List<Range<Token>> normalized = 
Range.normalize(Collections.singletonList(r(Long.MAX_VALUE - 
1000,Long.MIN_VALUE + 1000)));
         Verifier.RangeOwnHelper roh = new Verifier.RangeOwnHelper(normalized);
-        roh.check(dk(Long.MIN_VALUE));
-        roh.check(dk(Long.MAX_VALUE));
+        roh.validate(dk(Long.MIN_VALUE));
+        roh.validate(dk(Long.MAX_VALUE));
         boolean gotException = false;
         try
         {
-            roh.check(dk(26));
+            roh.validate(dk(26));
         }
         catch (Throwable t)
         {
@@ -673,7 +673,7 @@ public class VerifyTest
     @Test
     public void testEmptyRanges()
     {
-        new Verifier.RangeOwnHelper(Collections.emptyList()).check(dk(1));
+        new Verifier.RangeOwnHelper(Collections.emptyList()).validate(dk(1));
     }
 
     private DecoratedKey dk(long l)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
new file mode 100644
index 0000000..947f968
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriterTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.async.ByteBufDataInputPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.net.async.NonClosingDefaultFileRegion;
+import org.apache.cassandra.schema.CachingParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CassandraEntireSSTableStreamWriterTest
+{
+    public static final String KEYSPACE = 
"CassandraEntireSSTableStreamWriterTest";
+    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_INDEXED = "Indexed1";
+    public static final String CF_STANDARDLOWINDEXINTERVAL = 
"StandardLowIndexInterval";
+
+    private static SSTableReader sstable;
+    private static ColumnFamilyStore store;
+
+    @BeforeClass
+    public static void defineSchemaAndPrepareSSTable()
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARD),
+                                    SchemaLoader.compositeIndexCFMD(KEYSPACE, 
CF_INDEXED, true),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
CF_STANDARDLOWINDEXINTERVAL)
+                                                .minIndexInterval(8)
+                                                .maxIndexInterval(256)
+                                                
.caching(CachingParams.CACHE_NOTHING));
+
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        store = keyspace.getColumnFamilyStore("Standard1");
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            new RowUpdateBuilder(store.metadata(), j, String.valueOf(j))
+            .clustering("0")
+            .add("val", ByteBufferUtil.EMPTY_BYTE_BUFFER)
+            .build()
+            .applyUnsafe();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.performMaximal(store, false);
+
+        sstable = store.getLiveSSTables().iterator().next();
+    }
+
+    @Test
+    public void testBlockWriterOverWire() throws IOException
+    {
+        StreamSession session = setupStreamingSessionForTest();
+
+        CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, 
CassandraOutgoingFile.getComponentManifest(sstable));
+
+        EmbeddedChannel channel = new EmbeddedChannel();
+        ByteBufDataOutputStreamPlus out = 
ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+        writer.write(out);
+
+        Queue msgs = channel.outboundMessages();
+
+        assertTrue(msgs.peek() instanceof DefaultFileRegion);
+    }
+
+    @Test
+    public void testBlockReadingAndWritingOverWire() throws Exception
+    {
+        StreamSession session = setupStreamingSessionForTest();
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+
+        CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, 
CassandraOutgoingFile.getComponentManifest(sstable));
+
+        // This is needed as Netty releases the ByteBuffers as soon as the 
channel is flushed
+        ByteBuf serializedFile = Unpooled.buffer(8192);
+        EmbeddedChannel channel = createMockNettyChannel(serializedFile);
+        ByteBufDataOutputStreamPlus out = 
ByteBufDataOutputStreamPlus.create(session, channel, 1024 * 1024);
+
+        writer.write(out);
+
+        session.prepareReceiving(new StreamSummary(sstable.metadata().id, 1, 
5104));
+
+        CassandraStreamHeader header =
+            CassandraStreamHeader.builder()
+                                 
.withSSTableFormat(sstable.descriptor.formatType)
+                                 
.withSSTableVersion(sstable.descriptor.version)
+                                 .withSSTableLevel(0)
+                                 .withEstimatedKeys(sstable.estimatedKeys())
+                                 .withSections(Collections.emptyList())
+                                 
.withSerializationHeader(sstable.header.toComponent())
+                                 
.withComponentManifest(CassandraOutgoingFile.getComponentManifest(sstable))
+                                 .isEntireSSTable(true)
+                                 .withFirstKey(sstable.first)
+                                 .withTableId(sstable.metadata().id)
+                                 .build();
+
+        CassandraEntireSSTableStreamReader reader = new 
CassandraEntireSSTableStreamReader(new 
StreamMessageHeader(sstable.metadata().id, peer, session.planId(), 0, 0, 0, 
null), header, session);
+
+        SSTableMultiWriter sstableWriter = reader.read(new 
ByteBufDataInputPlus(serializedFile));
+        Collection<SSTableReader> newSstables = sstableWriter.finished();
+
+        assertEquals(1, newSstables.size());
+    }
+
+    private EmbeddedChannel createMockNettyChannel(ByteBuf serializedFile) 
throws Exception
+    {
+        WritableByteChannel wbc = new WritableByteChannel()
+        {
+            private boolean isOpen = true;
+            public int write(ByteBuffer src) throws IOException
+            {
+                int size = src.limit();
+                serializedFile.writeBytes(src);
+                return size;
+            }
+
+            public boolean isOpen()
+            {
+                return isOpen;
+            }
+
+            public void close() throws IOException
+            {
+                isOpen = false;
+            }
+        };
+
+        return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
+                @Override
+                public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception
+                {
+                    ((NonClosingDefaultFileRegion) msg).transferTo(wbc, 0);
+                    super.write(ctx, msg, promise);
+                }
+            });
+    }
+
+    private StreamSession setupStreamingSessionForTest()
+    {
+        StreamCoordinator streamCoordinator = new 
StreamCoordinator(StreamOperation.BOOTSTRAP, 1, new DefaultConnectionFactory(), 
false, null, PreviewKind.NONE);
+        StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), 
StreamOperation.BOOTSTRAP, Collections.<StreamEventHandler>emptyList(), 
streamCoordinator);
+
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
+        streamCoordinator.addSessionInfo(new SessionInfo(peer, 0, peer, 
Collections.emptyList(), Collections.emptyList(), 
StreamSession.State.INITIALIZED));
+
+        StreamSession session = streamCoordinator.getOrCreateNextSession(peer);
+        session.init(future);
+        return session;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to