Stream entire SSTables when possible

patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg
for CASSANDRA-14566


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a12c52
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a12c52
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a12c52

Branch: refs/heads/trunk
Commit: 47a12c52a313258307ab88392f75c5866d9a2bb1
Parents: 6ba2fb9
Author: Dinesh A. Joshi <[email protected]>
Authored: Tue Jul 3 12:07:11 2018 -0700
Committer: Aleksey Yeshchenko <[email protected]>
Committed: Fri Jul 27 17:50:25 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 conf/cassandra.yaml                             |  12 +
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |  14 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../org/apache/cassandra/db/DiskBoundaries.java |  17 +-
 .../cassandra/db/compaction/Verifier.java       |  26 +-
 .../apache/cassandra/db/lifecycle/LogFile.java  |   2 +-
 .../cassandra/db/lifecycle/LogReplicaSet.java   |   3 +-
 .../CassandraCompressedStreamReader.java        | 131 ++++++++
 .../CassandraCompressedStreamWriter.java        | 152 +++++++++
 .../CassandraEntireSSTableStreamReader.java     | 177 ++++++++++
 .../CassandraEntireSSTableStreamWriter.java     | 120 +++++++
 .../db/streaming/CassandraIncomingFile.java     |  17 +-
 .../db/streaming/CassandraOutgoingFile.java     | 132 +++++++-
 .../db/streaming/CassandraStreamHeader.java     | 250 ++++++++++++--
 .../db/streaming/CassandraStreamManager.java    |   4 +-
 .../db/streaming/CassandraStreamReader.java     |   5 +-
 .../db/streaming/ComponentManifest.java         | 130 ++++++++
 .../CompressedCassandraStreamReader.java        | 131 --------
 .../CompressedCassandraStreamWriter.java        | 152 ---------
 .../db/streaming/CompressedInputStream.java     |   4 +-
 .../cassandra/db/streaming/IStreamReader.java   |  32 ++
 .../apache/cassandra/io/sstable/Component.java  |   7 +-
 .../cassandra/io/sstable/SSTableLoader.java     |   2 +-
 .../format/big/BigTableZeroCopyWriter.java      | 226 +++++++++++++
 .../io/util/BufferedDataOutputStreamPlus.java   |   4 +-
 .../cassandra/io/util/CheckedFunction.java      |  25 ++
 .../cassandra/io/util/DataOutputPlus.java       |   4 +-
 .../io/util/RebufferingInputStream.java         |   4 +-
 .../cassandra/io/util/SequentialWriter.java     |  17 +-
 .../io/util/UnbufferedDataOutputStreamPlus.java |   2 +-
 .../net/async/ByteBufDataOutputPlus.java        |   5 +-
 .../net/async/ByteBufDataOutputStreamPlus.java  |  60 +++-
 .../net/async/NonClosingDefaultFileRegion.java  |  51 +++
 .../async/RebufferingByteBufDataInputPlus.java  |  39 +++
 .../cassandra/streaming/StreamCoordinator.java  |   3 +-
 .../cassandra/streaming/StreamReceiveTask.java  |   5 +-
 .../cassandra/streaming/StreamResultFuture.java |  13 +-
 .../cassandra/streaming/StreamSession.java      |   6 +-
 .../async/NettyStreamingMessageSender.java      |   2 +-
 .../async/StreamingInboundHandler.java          |   2 +-
 .../streaming/messages/StreamInitMessage.java   |   7 +-
 .../org/apache/cassandra/utils/Collectors3.java |  54 +++
 test/conf/cassandra.yaml                        |   2 +
 .../cassandra/streaming/LongStreamingTest.java  |   6 +-
 .../microbench/ZeroCopyStreamingBenchmark.java  | 329 +++++++++++++++++++
 .../org/apache/cassandra/db/VerifyTest.java     |  30 +-
 .../CassandraEntireSSTableStreamWriterTest.java | 209 ++++++++++++
 .../db/streaming/CassandraOutgoingFileTest.java | 145 ++++++++
 .../db/streaming/CassandraStreamHeaderTest.java |  62 +++-
 .../db/streaming/ComponentManifestTest.java     |  64 ++++
 .../io/sstable/BigTableWriterTest.java          |   5 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +
 .../format/big/BigTableZeroCopyWriterTest.java  | 208 ++++++++++++
 .../RebufferingByteBufDataInputPlusTest.java    |  98 ++++++
 .../serializers/SerializationUtils.java         |   1 -
 .../streaming/StreamTransferTaskTest.java       |   4 +-
 .../streaming/StreamingTransferTest.java        |   1 +
 60 files changed, 2809 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b6418c..6ede70e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Stream entire SSTables when possible (CASSANDRA-14556)
  * Add experimental support for Java 11 (CASSANDRA-9608)
  * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable 
(CASSANDRA-14580)
  * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 75885e9..3fab849 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -91,6 +91,9 @@ New features
      statements after the cache has been refreshed. CASSANDRA-13985
    - Support for audit logging of database activity. If enabled, logs every 
incoming
      CQL command request, Authentication (successful as well as unsuccessful 
login) to a node.
+   - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, 
Cassandra will use stream
+     entire SSTables, significantly speeding up transfers. Any streaming 
related operations will see
+     corresponding improvement. See CASSANDRA-14556.
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 439b85a..663daaa 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -788,6 +788,18 @@ compaction_throughput_mb_per_sec: 16
 # between the sstables, reducing page cache churn and keeping hot rows hot
 sstable_preemptive_open_interval_in_mb: 50
 
+# When enabled, permits Cassandra to zero-copy stream entire eligible
+# SSTables between nodes, including every component.
+# This speeds up the network transfer significantly subject to
+# throttling specified by stream_throughput_outbound_megabits_per_sec.
+# Enabling this will reduce the GC pressure on sending and receiving node.
+# When unset, the default is enabled. While this feature tries to keep the
+# disks balanced, it cannot guarantee it. This feature will be automatically
+# disabled if internode encryption is enabled. Currently this can be used with
+# Leveled Compaction. Once CASSANDRA-14586 is fixed other compaction strategies
+# will benefit as well when used in combination with CASSANDRA-6696.
+# stream_entire_sstables: true
+
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
 # mostly sequential IO when streaming data during bootstrap or repair, which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 0d4760e..3a7ff0d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -381,6 +381,7 @@ public class Config
     public int block_for_peers_timeout_in_secs = 10;
     public volatile boolean automatic_sstable_upgrade = false;
     public volatile int max_concurrent_automatic_sstable_upgrades = 1;
+    public boolean stream_entire_sstables = true;
 
     public volatile AuditLogOptions audit_logging_options = new 
AuditLogOptions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 6301ab0..366dac7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -714,6 +714,15 @@ public class DatabaseDescriptor
                                             
"server_encryption_options.internode_encryption = " + 
conf.server_encryption_options.internode_encryption, false);
         }
 
+        if (conf.stream_entire_sstables)
+        {
+            if (conf.server_encryption_options.enabled || 
conf.server_encryption_options.optional)
+            {
+                logger.warn("Internode encryption enabled. Disabling zero copy 
SSTable transfers for streaming.");
+                conf.stream_entire_sstables = false;
+            }
+        }
+
         if (conf.max_value_size_in_mb <= 0)
             throw new ConfigurationException("max_value_size_in_mb must be 
positive", false);
         else if (conf.max_value_size_in_mb >= 2048)
@@ -2274,6 +2283,11 @@ public class DatabaseDescriptor
         return conf.streaming_connections_per_host;
     }
 
+    public static boolean streamEntireSSTables()
+    {
+        return conf.stream_entire_sstables;
+    }
+
     public static String getLocalDataCenter()
     {
         return localDC;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9c4921e..f03ffe6 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -791,7 +791,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         return newSSTableDescriptor(directory, format.info.getLatestVersion(), 
format);
     }
 
-    private Descriptor newSSTableDescriptor(File directory, Version version, 
SSTableFormat.Type format)
+    public Descriptor newSSTableDescriptor(File directory, Version version, 
SSTableFormat.Type format)
     {
         return new Descriptor(version,
                               directory,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java 
b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index 086bc84..22f17b0 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -129,4 +129,19 @@ public class DiskBoundaries
     {
         return directories.get(getDiskIndex(sstable));
     }
-}
\ No newline at end of file
+
+    public Directories.DataDirectory getCorrectDiskForKey(DecoratedKey key)
+    {
+        if (positions == null)
+            return null;
+
+        return directories.get(getDiskIndex(key));
+    }
+
+    private int getDiskIndex(DecoratedKey key)
+    {
+        int pos = Collections.binarySearch(positions, key);
+        assert pos < 0;
+        return -pos - 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java 
b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index bc9679d..db49369 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -180,7 +180,7 @@ public class Verifier implements Closeable
                 while (iter.hasNext())
                 {
                     DecoratedKey key = iter.next();
-                    rangeOwnHelper.check(key);
+                    rangeOwnHelper.validate(key);
                 }
             }
             catch (Throwable t)
@@ -262,7 +262,7 @@ public class Verifier implements Closeable
                 {
                     try
                     {
-                        rangeOwnHelper.check(key);
+                        rangeOwnHelper.validate(key);
                     }
                     catch (Throwable t)
                     {
@@ -360,13 +360,27 @@ public class Verifier implements Closeable
          * @param key the key
          * @throws RuntimeException if the key is not contained
          */
-        public void check(DecoratedKey key)
+        public void validate(DecoratedKey key)
+        {
+            if (!check(key))
+                throw new RuntimeException("Key " + key + " is not contained 
in the given ranges");
+        }
+
+        /**
+         * check if the given key is contained in any of the given ranges
+         *
+         * Must be called in sorted order - key should be increasing
+         *
+         * @param key the key
+         * @return boolean
+         */
+        public boolean check(DecoratedKey key)
         {
             assert lastKey == null || key.compareTo(lastKey) > 0;
             lastKey = key;
 
             if (normalizedRanges.isEmpty()) // handle tests etc where we don't 
have any ranges
-                return;
+                return true;
 
             if (rangeIndex > normalizedRanges.size() - 1)
                 throw new IllegalStateException("RangeOwnHelper can only be 
used to find the first out-of-range-token");
@@ -375,8 +389,10 @@ public class Verifier implements Closeable
             {
                 rangeIndex++;
                 if (rangeIndex > normalizedRanges.size() - 1)
-                    throw new RuntimeException("Key "+key+" is not contained 
in the given ranges");
+                    return false;
             }
+
+            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index af6f435..98be0a0 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -66,7 +66,7 @@ final class LogFile implements AutoCloseable
     private final LogReplicaSet replicas = new LogReplicaSet();
 
     // The transaction records, this set must be ORDER PRESERVING
-    private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>();
+    private final Set<LogRecord> records = Collections.synchronizedSet(new 
LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
 
     // The type of the transaction
     private final OperationType type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java 
b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
index 65be285..f5423d6 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +46,7 @@ public class LogReplicaSet implements AutoCloseable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(LogReplicaSet.class);
 
-    private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>();
+    private final Map<File, LogReplica> replicasByFile = 
Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix 
CASSANDRA-14554
 
     private Collection<LogReplica> replicas()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
new file mode 100644
index 0000000..eb993ff
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
+/**
+ * CassandraStreamReader that reads from streamed compressed SSTable
+ */
+public class CassandraCompressedStreamReader extends CassandraStreamReader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraCompressedStreamReader.class);
+
+    protected final CompressionInfo compressionInfo;
+
+    public CassandraCompressedStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        super(header, streamHeader, session);
+        this.compressionInfo = streamHeader.compressionInfo;
+    }
+
+    /**
+     * @return SSTable transferred
+     * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
+     */
+    @Override
+    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    {
+        long totalSize = totalSize();
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("CF " + tableId + " was dropped during 
streaming");
+        }
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
+                     cfs.getTableName());
+
+        StreamDeserializer deserializer = null;
+        SSTableMultiWriter writer = null;
+        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
+        {
+            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
+            String filename = writer.getFilename();
+            int sectionIdx = 0;
+            for (SSTableReader.PartitionPositionBounds section : sections)
+            {
+                assert cis.getTotalCompressedBytesRead() <= totalSize;
+                long sectionLength = section.upperPosition - 
section.lowerPosition;
+
+                logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
+                // skip to beginning of section inside chunk
+                cis.position(section.lowerPosition);
+                in.reset(0);
+
+                while (in.getBytesRead() < sectionLength)
+                {
+                    writePartition(deserializer, writer);
+                    // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
+                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, 
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
+            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
+                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName());
+            if (writer != null)
+            {
+                writer.abort(e);
+            }
+            if (extractIOExceptionCause(e).isPresent())
+                throw e;
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
new file mode 100644
index 0000000..3b971f8
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraStreamWriter for compressed SSTable.
+ */
+public class CassandraCompressedStreamWriter extends CassandraStreamWriter
+{
+    private static final int CHUNK_SIZE = 1 << 16;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraCompressedStreamWriter.class);
+
+    private final CompressionInfo compressionInfo;
+
+    public CassandraCompressedStreamWriter(SSTableReader sstable, 
Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo 
compressionInfo, StreamSession session)
+    {
+        super(sstable, sections, session);
+        this.compressionInfo = compressionInfo;
+    }
+
+    @Override
+    public void write(DataOutputStreamPlus out) throws IOException
+    {
+        assert out instanceof ByteBufDataOutputStreamPlus;
+        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
+        long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+        {
+            long progress = 0L;
+            // calculate chunks to transfer. we want to send continuous chunks 
altogether.
+            List<SSTableReader.PartitionPositionBounds> sections = 
getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
+            // stream each of the required sections of the file
+            for (final SSTableReader.PartitionPositionBounds section : 
sections)
+            {
+                // length of the section to stream
+                long length = section.upperPosition - section.lowerPosition;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
+
+                // tracks write progress
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                {
+                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - 
bytesTransferred);
+                    limiter.acquire(toTransfer);
+
+                    ByteBuffer outBuffer = 
ByteBuffer.allocateDirect(toTransfer);
+                    long lastWrite;
+                    try
+                    {
+                        lastWrite = fc.read(outBuffer, section.lowerPosition + 
bytesTransferred);
+                        assert lastWrite == toTransfer : String.format("could 
not read required number of bytes from file to be streamed: read %d bytes, 
wanted %d bytes", lastWrite, toTransfer);
+                        outBuffer.flip();
+                        output.writeToChannel(outBuffer);
+                    }
+                    catch (IOException e)
+                    {
+                        FileUtils.clean(outBuffer);
+                        throw e;
+                    }
+
+                    bytesTransferred += lastWrite;
+                    progress += lastWrite;
+                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+
+    // chunks are assumed to be sorted by offset
+    private List<SSTableReader.PartitionPositionBounds> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
+    {
+        List<SSTableReader.PartitionPositionBounds> transferSections = new 
ArrayList<>();
+        SSTableReader.PartitionPositionBounds lastSection = null;
+        for (CompressionMetadata.Chunk chunk : chunks)
+        {
+            if (lastSection != null)
+            {
+                if (chunk.offset == lastSection.upperPosition)
+                {
+                    // extend previous section to end of this chunk
+                    lastSection = new 
SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
+                }
+                else
+                {
+                    transferSections.add(lastSection);
+                    lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
+                }
+            }
+            else
+            {
+                lastSection = new 
SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length 
+ 4);
+            }
+        }
+        if (lastSection != null)
+            transferSections.add(lastSection);
+        return transferSections;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
new file mode 100644
index 0000000..6f8f06a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes it 
to disk.
+ */
+public class CassandraEntireSSTableStreamReader implements IStreamReader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class);
+
+    private final TableId tableId;
+    private final StreamSession session;
+    private final CassandraStreamHeader header;
+    private final int fileSequenceNumber;
+
+    public CassandraEntireSSTableStreamReader(StreamMessageHeader 
messageHeader, CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        if (streamHeader.format != SSTableFormat.Type.BIG)
+            throw new AssertionError("Unsupported SSTable format " + 
streamHeader.format);
+
+        if (session.getPendingRepair() != null)
+        {
+            // we should only ever be streaming pending repair sstables if the 
session has a pending repair id
+            if 
(!session.getPendingRepair().equals(messageHeader.pendingRepair))
+                throw new IllegalStateException(format("Stream Session & 
SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+        }
+
+        this.header = streamHeader;
+        this.session = session;
+        this.tableId = messageHeader.tableId;
+        this.fileSequenceNumber = messageHeader.sequenceNumber;
+    }
+
+    /**
+     * @param in where this reads data from
+     * @return SSTable transferred
+     * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
+     */
+    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
+    @Override
+    public SSTableMultiWriter read(DataInputPlus in) throws IOException
+    {
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("Table " + tableId + " was dropped during 
streaming");
+        }
+
+        ComponentManifest manifest = header.componentManifest;
+        long totalSize = manifest.totalSize();
+
+        logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size 
= {}, table = {}",
+                     session.planId(),
+                     fileSequenceNumber,
+                     session.peer,
+                     prettyPrintMemory(totalSize),
+                     cfs.metadata());
+
+        BigTableZeroCopyWriter writer = null;
+
+        try
+        {
+            writer = createWriter(cfs, totalSize, manifest.components());
+            long bytesRead = 0;
+            for (Component component : manifest.components())
+            {
+                long length = manifest.sizeOf(component);
+
+                logger.debug("[Stream #{}] Started receiving {} component from 
{}, componentSize = {}, readBytes = {}, totalSize = {}",
+                             session.planId(),
+                             component,
+                             session.peer,
+                             prettyPrintMemory(length),
+                             prettyPrintMemory(bytesRead),
+                             prettyPrintMemory(totalSize));
+
+                writer.writeComponent(component.type, in, length);
+                session.progress(writer.descriptor.filenameFor(component), 
ProgressInfo.Direction.IN, length, length);
+                bytesRead += length;
+
+                logger.debug("[Stream #{}] Finished receiving {} component 
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
+                             session.planId(),
+                             component,
+                             session.peer,
+                             prettyPrintMemory(length),
+                             prettyPrintMemory(bytesRead),
+                             prettyPrintMemory(totalSize));
+            }
+
+            
writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, 
header.sstableLevel);
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            logger.error("[Stream {}] Error while reading sstable from stream 
for table = {}", session.planId(), cfs.metadata(), e);
+            if (writer != null)
+                e = writer.abort(e);
+            Throwables.throwIfUnchecked(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws 
IOException
+    {
+        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
+        if (localDir == null)
+            throw new IOException(format("Insufficient disk space to store 
%s", prettyPrintMemory(totalSize)));
+
+        File dir = 
cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey));
+
+        if (dir == null)
+            return cfs.getDirectories().getDirectoryForNewSSTables();
+
+        return dir;
+    }
+
+    @SuppressWarnings("resource")
+    protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, Collection<Component> components) throws IOException
+    {
+        File dataDir = getDataDir(cfs, totalSize);
+
+        StreamReceiver streamReceiver = session.getAggregator(tableId);
+        assert streamReceiver instanceof CassandraStreamReceiver;
+
+        LifecycleTransaction txn = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
+
+        Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, 
header.format);
+
+        logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), 
desc.filenameFor(Component.DATA), components);
+
+        return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, components);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
new file mode 100644
index 0000000..7a20110
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraEntireSSTableStreamWriter streams the entire SSTable to given 
channel.
+ */
+public class CassandraEntireSSTableStreamWriter
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class);
+
+    private final SSTableReader sstable;
+    private final ComponentManifest manifest;
+    private final StreamSession session;
+    private final StreamRateLimiter limiter;
+
+    public CassandraEntireSSTableStreamWriter(SSTableReader sstable, 
StreamSession session, ComponentManifest manifest)
+    {
+        this.session = session;
+        this.sstable = sstable;
+        this.manifest = manifest;
+        this.limiter = StreamManager.getRateLimiter(session.peer);
+    }
+
+    /**
+     * Stream the entire file to given channel.
+     * <p>
+     *
+     * @param out where this writes data to
+     * @throws IOException on any I/O error
+     */
+    public void write(ByteBufDataOutputStreamPlus out) throws IOException
+    {
+        long totalSize = manifest.totalSize();
+        logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}",
+                     session.planId(),
+                     sstable.getFilename(),
+                     session.peer,
+                     sstable.getSSTableMetadata().repairedAt,
+                     prettyPrintMemory(totalSize));
+
+        long progress = 0L;
+
+        for (Component component : manifest.components())
+        {
+            @SuppressWarnings("resource") // this is closed after the file is 
transferred by ByteBufDataOutputStreamPlus
+            FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+            // Total Length to transmit for this file
+            long length = in.size();
+
+            // tracks write progress
+            logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} 
size {}", session.planId(),
+                         sstable.getKeyspaceName(),
+                         sstable.getColumnFamilyName(),
+                         sstable.descriptor.generation,
+                         component,
+                         prettyPrintMemory(length));
+
+            long bytesWritten = out.writeToChannel(in, limiter);
+            progress += bytesWritten;
+
+            session.progress(sstable.descriptor.filenameFor(component), 
ProgressInfo.Direction.OUT, bytesWritten, length);
+
+            logger.debug("[Stream #{}] Finished streaming {}.{} gen {} 
component {} to {}, xfered = {}, length = {}, totalSize = {}",
+                         session.planId(),
+                         sstable.getKeyspaceName(),
+                         sstable.getColumnFamilyName(),
+                         sstable.descriptor.generation,
+                         component,
+                         session.peer,
+                         prettyPrintMemory(bytesWritten),
+                         prettyPrintMemory(length),
+                         prettyPrintMemory(totalSize));
+        }
+
+        out.flush();
+
+        logger.debug("[Stream #{}] Finished streaming sstable {} to {}, xfered 
= {}, totalSize = {}",
+                     session.planId(),
+                     sstable.getFilename(),
+                     session.peer,
+                     prettyPrintMemory(progress),
+                     prettyPrintMemory(totalSize));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
index 16698e5..c65ca62 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Objects;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
@@ -45,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream
     private volatile SSTableMultiWriter sstable;
     private volatile long size = -1;
 
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraIncomingFile.class);
+
     public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, 
StreamMessageHeader header)
     {
         this.cfs = cfs;
@@ -56,9 +60,16 @@ public class CassandraIncomingFile implements IncomingStream
     public synchronized void read(DataInputPlus in, int version) throws 
IOException
     {
         CassandraStreamHeader streamHeader = 
CassandraStreamHeader.serializer.deserialize(in, version);
-        CassandraStreamReader reader = !streamHeader.isCompressed()
-                                       ? new CassandraStreamReader(header, 
streamHeader, session)
-                                       : new 
CompressedCassandraStreamReader(header, streamHeader, session);
+        logger.debug("Incoming stream entireSSTable={} components={}", 
streamHeader.isEntireSSTable, streamHeader.componentManifest);
+
+        IStreamReader reader;
+        if (streamHeader.isEntireSSTable)
+            reader = new CassandraEntireSSTableStreamReader(header, 
streamHeader, session);
+        else if (streamHeader.isCompressed())
+            reader = new CassandraCompressedStreamReader(header, streamHeader, 
session);
+        else
+            reader = new CassandraStreamReader(header, streamHeader, session);
+
         size = streamHeader.size();
         sstable = reader.read(in);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
index 6ec1f85..5252187 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -18,51 +18,100 @@
 
 package org.apache.cassandra.db.streaming;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.KeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.OutgoingStream;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.concurrent.Ref;
 
+import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper;
+
 /**
  * used to transfer the part(or whole) of a SSTable data file
  */
 public class CassandraOutgoingFile implements OutgoingStream
 {
+    public static final List<Component> STREAM_COMPONENTS = 
ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+                                                                             
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+                                                                             
Component.DIGEST, Component.CRC);
+
     private final Ref<SSTableReader> ref;
     private final long estimatedKeys;
     private final List<SSTableReader.PartitionPositionBounds> sections;
     private final String filename;
     private final CassandraStreamHeader header;
     private final boolean keepSSTableLevel;
+    private final ComponentManifest manifest;
+    private Boolean isFullyContained;
 
-    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys)
+    private final List<Range<Token>> ranges;
+
+    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref,
+                                 List<SSTableReader.PartitionPositionBounds> 
sections, Collection<Range<Token>> ranges,
+                                 long estimatedKeys)
     {
         Preconditions.checkNotNull(ref.get());
         this.ref = ref;
         this.estimatedKeys = estimatedKeys;
         this.sections = sections;
+        this.ranges = ImmutableList.copyOf(ranges);
         this.filename = ref.get().getFilename();
+        this.manifest = getComponentManifest(ref.get());
 
         SSTableReader sstable = ref.get();
         keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation 
== StreamOperation.REBUILD;
-        this.header = new CassandraStreamHeader(sstable.descriptor.version,
-                                                sstable.descriptor.formatType,
-                                                estimatedKeys,
-                                                sections,
-                                                sstable.compression ? 
sstable.getCompressionMetadata() : null,
-                                                keepSSTableLevel ? 
sstable.getSSTableLevel() : 0,
-                                                sstable.header.toComponent());
+        this.header =
+            CassandraStreamHeader.builder()
+                                 
.withSSTableFormat(sstable.descriptor.formatType)
+                                 
.withSSTableVersion(sstable.descriptor.version)
+                                 .withSSTableLevel(keepSSTableLevel ? 
sstable.getSSTableLevel() : 0)
+                                 .withEstimatedKeys(estimatedKeys)
+                                 .withSections(sections)
+                                 .withCompressionMetadata(sstable.compression 
? sstable.getCompressionMetadata() : null)
+                                 
.withSerializationHeader(sstable.header.toComponent())
+                                 .isEntireSSTable(shouldStreamEntireSSTable())
+                                 .withComponentManifest(manifest)
+                                 .withFirstKey(sstable.first)
+                                 .withTableId(sstable.metadata().id)
+                                 .build();
+    }
+
+    @VisibleForTesting
+    public static ComponentManifest getComponentManifest(SSTableReader sstable)
+    {
+        LinkedHashMap<Component, Long> components = new 
LinkedHashMap<>(STREAM_COMPONENTS.size());
+        for (Component component : STREAM_COMPONENTS)
+        {
+            File file = new File(sstable.descriptor.filenameFor(component));
+            if (file.exists())
+                components.put(component, file.length());
+        }
+
+        return new ComponentManifest(components);
     }
 
     public static CassandraOutgoingFile fromStream(OutgoingStream stream)
@@ -114,11 +163,68 @@ public class CassandraOutgoingFile implements 
OutgoingStream
         CassandraStreamHeader.serializer.serialize(header, out, version);
         out.flush();
 
-        CassandraStreamWriter writer = header.compressionInfo == null ?
-                                       new CassandraStreamWriter(sstable, 
header.sections, session) :
-                                       new 
CompressedCassandraStreamWriter(sstable, header.sections,
-                                                                           
header.compressionInfo, session);
-        writer.write(out);
+        if (shouldStreamEntireSSTable() && out instanceof 
ByteBufDataOutputStreamPlus)
+        {
+            CassandraEntireSSTableStreamWriter writer = new 
CassandraEntireSSTableStreamWriter(sstable, session, manifest);
+            writer.write((ByteBufDataOutputStreamPlus) out);
+        }
+        else
+        {
+            CassandraStreamWriter writer = (header.compressionInfo == null) ?
+                     new CassandraStreamWriter(sstable, header.sections, 
session) :
+                     new CassandraCompressedStreamWriter(sstable, 
header.sections,
+                                                         
header.compressionInfo, session);
+            writer.write(out);
+        }
+    }
+
+    @VisibleForTesting
+    public boolean shouldStreamEntireSSTable()
+    {
+        // don't stream if full sstable transfers are disabled or legacy 
counter shards are present
+        if (!DatabaseDescriptor.streamEntireSSTables() || 
ref.get().getSSTableMetadata().hasLegacyCounterShards)
+            return false;
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId());
+
+        if (cfs == null)
+            return false;
+
+        AbstractCompactionStrategy compactionStrategy = 
cfs.getCompactionStrategyManager()
+                                                           
.getCompactionStrategyFor(ref.get());
+
+        if (compactionStrategy instanceof LeveledCompactionStrategy)
+            return contained(ranges, ref.get());
+
+        return false;
+    }
+
+    @VisibleForTesting
+    public boolean contained(List<Range<Token>> normalizedRanges, 
SSTableReader sstable)
+    {
+        if (isFullyContained != null)
+            return isFullyContained;
+
+        isFullyContained = computeContainment(normalizedRanges, sstable);
+        return isFullyContained;
+    }
+
+    private boolean computeContainment(List<Range<Token>> normalizedRanges, 
SSTableReader sstable)
+    {
+        if (normalizedRanges == null)
+            return false;
+
+        RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges);
+        try (KeyIterator iter = new KeyIterator(sstable.descriptor, 
sstable.metadata()))
+        {
+            while (iter.hasNext())
+            {
+                DecoratedKey key = iter.next();
+                if (!rangeOwnHelper.check(key))
+                    return false;
+            }
+        }
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
index 43631b0..2af56de 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -15,16 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db.streaming;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -32,6 +38,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 public class CassandraStreamHeader
 {
@@ -50,33 +60,38 @@ public class CassandraStreamHeader
     private final CompressionMetadata compressionMetadata;
     public volatile CompressionInfo compressionInfo;
     public final int sstableLevel;
-    public final SerializationHeader.Component header;
+    public final SerializationHeader.Component serializationHeader;
+
+    /* flag indicating whether this is a partial or entire sstable transfer */
+    public final boolean isEntireSSTable;
+    /* first token of the sstable required for faster streaming */
+    public final DecoratedKey firstKey;
+    public final TableId tableId;
+    public final ComponentManifest componentManifest;
 
     /* cached size value */
     private transient final long size;
 
-    private CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int 
sstableLevel, SerializationHeader.Component header)
-    {
-        this.version = version;
-        this.format = format;
-        this.estimatedKeys = estimatedKeys;
-        this.sections = sections;
-        this.compressionMetadata = compressionMetadata;
-        this.compressionInfo = compressionInfo;
-        this.sstableLevel = sstableLevel;
-        this.header = header;
-
-        this.size = calculateSize();
-    }
-
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionMetadata compressionMetadata, int sstableLevel, 
SerializationHeader.Component header)
+    private CassandraStreamHeader(Builder builder)
     {
-        this(version, format, estimatedKeys, sections, compressionMetadata, 
null, sstableLevel, header);
+        version = builder.version;
+        format = builder.format;
+        estimatedKeys = builder.estimatedKeys;
+        sections = builder.sections;
+        compressionMetadata = builder.compressionMetadata;
+        compressionInfo = builder.compressionInfo;
+        sstableLevel = builder.sstableLevel;
+        serializationHeader = builder.serializationHeader;
+        tableId = builder.tableId;
+        isEntireSSTable = builder.isEntireSSTable;
+        componentManifest = builder.componentManifest;
+        firstKey = builder.firstKey;
+        size = calculateSize();
     }
 
-    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, 
CompressionInfo compressionInfo, int sstableLevel, 
SerializationHeader.Component header)
+    public static Builder builder()
     {
-        this(version, format, estimatedKeys, sections, null, compressionInfo, 
sstableLevel, header);
+        return new Builder();
     }
 
     public boolean isCompressed()
@@ -94,6 +109,9 @@ public class CassandraStreamHeader
 
     private long calculateSize()
     {
+        if (isEntireSSTable)
+            return componentManifest.totalSize();
+
         long transferSize = 0;
         if (compressionInfo != null)
         {
@@ -112,9 +130,7 @@ public class CassandraStreamHeader
     public synchronized void calculateCompressionInfo()
     {
         if (compressionMetadata != null && compressionInfo == null)
-        {
             compressionInfo = 
CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
-        }
     }
 
     @Override
@@ -125,9 +141,11 @@ public class CassandraStreamHeader
                ", format=" + format +
                ", estimatedKeys=" + estimatedKeys +
                ", sections=" + sections +
-               ", compressionInfo=" + compressionInfo +
                ", sstableLevel=" + sstableLevel +
-               ", header=" + header +
+               ", header=" + serializationHeader +
+               ", isEntireSSTable=" + isEntireSSTable +
+               ", firstKey=" + firstKey +
+               ", tableId=" + tableId +
                '}';
     }
 
@@ -138,20 +156,26 @@ public class CassandraStreamHeader
         CassandraStreamHeader that = (CassandraStreamHeader) o;
         return estimatedKeys == that.estimatedKeys &&
                sstableLevel == that.sstableLevel &&
+               isEntireSSTable == that.isEntireSSTable &&
                Objects.equals(version, that.version) &&
                format == that.format &&
                Objects.equals(sections, that.sections) &&
                Objects.equals(compressionInfo, that.compressionInfo) &&
-               Objects.equals(header, that.header);
+               Objects.equals(serializationHeader, that.serializationHeader) &&
+               Objects.equals(componentManifest, that.componentManifest) &&
+               Objects.equals(firstKey, that.firstKey) &&
+               Objects.equals(tableId, that.tableId);
     }
 
     public int hashCode()
     {
-        return Objects.hash(version, format, estimatedKeys, sections, 
compressionInfo, sstableLevel, header);
+        return Objects.hash(version, format, estimatedKeys, sections, 
compressionInfo, sstableLevel, serializationHeader, componentManifest,
+                            isEntireSSTable, firstKey, tableId);
     }
 
+    public static final IVersionedSerializer<CassandraStreamHeader> serializer 
= new CassandraStreamHeaderSerializer();
 
-    public static final IVersionedSerializer<CassandraStreamHeader> serializer 
= new IVersionedSerializer<CassandraStreamHeader>()
+    public static class CassandraStreamHeaderSerializer implements 
IVersionedSerializer<CassandraStreamHeader>
     {
         public void serialize(CassandraStreamHeader header, DataOutputPlus 
out, int version) throws IOException
         {
@@ -168,11 +192,33 @@ public class CassandraStreamHeader
             header.calculateCompressionInfo();
             CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
             out.writeInt(header.sstableLevel);
-            SerializationHeader.serializer.serialize(header.version, 
header.header, out);
+
+            SerializationHeader.serializer.serialize(header.version, 
header.serializationHeader, out);
+
+            header.tableId.serialize(out);
+            out.writeBoolean(header.isEntireSSTable);
+
+            if (header.isEntireSSTable)
+            {
+                
ComponentManifest.serializer.serialize(header.componentManifest, out, version);
+                ByteBufferUtil.writeWithVIntLength(header.firstKey.getKey(), 
out);
+            }
         }
 
         public CassandraStreamHeader deserialize(DataInputPlus in, int 
version) throws IOException
         {
+            return deserialize(in, version, tableId -> {
+                ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+                if (cfs != null)
+                    return cfs.getPartitioner();
+
+                return null;
+            });
+        }
+
+        @VisibleForTesting
+        public CassandraStreamHeader deserialize(DataInputPlus in, int 
version, Function<TableId, IPartitioner> partitionerMapper) throws IOException
+        {
             Version sstableVersion = 
SSTableFormat.Type.current().info.getVersion(in.readUTF());
             SSTableFormat.Type format = 
SSTableFormat.Type.validate(in.readUTF());
 
@@ -183,9 +229,36 @@ public class CassandraStreamHeader
                 sections.add(new 
SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
             CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
             int sstableLevel = in.readInt();
+
             SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-            return new CassandraStreamHeader(sstableVersion, format, 
estimatedKeys, sections, compressionInfo, sstableLevel, header);
+            TableId tableId = TableId.deserialize(in);
+            boolean isEntireSSTable = in.readBoolean();
+            ComponentManifest manifest = null;
+            DecoratedKey firstKey = null;
+
+            if (isEntireSSTable)
+            {
+                manifest = ComponentManifest.serializer.deserialize(in, 
version);
+                ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+                IPartitioner partitioner = partitionerMapper.apply(tableId);
+                if (partitioner == null)
+                    throw new IllegalArgumentException(String.format("Could 
not determine partitioner for tableId %s", tableId));
+                firstKey = partitioner.decorateKey(keyBuf);
+            }
+
+            return builder().withSSTableFormat(format)
+                            .withSSTableVersion(sstableVersion)
+                            .withSSTableLevel(sstableLevel)
+                            .withEstimatedKeys(estimatedKeys)
+                            .withSections(sections)
+                            .withCompressionInfo(compressionInfo)
+                            .withSerializationHeader(header)
+                            .withComponentManifest(manifest)
+                            .isEntireSSTable(isEntireSSTable)
+                            .withFirstKey(firstKey)
+                            .withTableId(tableId)
+                            .build();
         }
 
         public long serializedSize(CassandraStreamHeader header, int version)
@@ -201,12 +274,127 @@ public class CassandraStreamHeader
                 size += TypeSizes.sizeof(section.lowerPosition);
                 size += TypeSizes.sizeof(section.upperPosition);
             }
+
+            header.calculateCompressionInfo();
             size += 
CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
             size += TypeSizes.sizeof(header.sstableLevel);
 
-            size += 
SerializationHeader.serializer.serializedSize(header.version, header.header);
+            size += 
SerializationHeader.serializer.serializedSize(header.version, 
header.serializationHeader);
+
+            size += header.tableId.serializedSize();
+            size += TypeSizes.sizeof(header.isEntireSSTable);
 
+            if (header.isEntireSSTable)
+            {
+                size += 
ComponentManifest.serializer.serializedSize(header.componentManifest, version);
+                size += 
ByteBufferUtil.serializedSizeWithVIntLength(header.firstKey.getKey());
+            }
             return size;
         }
-    };
+    }
+
+    public static final class Builder
+    {
+        private Version version;
+        private SSTableFormat.Type format;
+        private long estimatedKeys;
+        private List<SSTableReader.PartitionPositionBounds> sections;
+        private CompressionMetadata compressionMetadata;
+        private CompressionInfo compressionInfo;
+        private int sstableLevel;
+        private SerializationHeader.Component serializationHeader;
+        private ComponentManifest componentManifest;
+        private boolean isEntireSSTable;
+        private DecoratedKey firstKey;
+        private TableId tableId;
+
+        public Builder withSSTableFormat(SSTableFormat.Type format)
+        {
+            this.format = format;
+            return this;
+        }
+
+        public Builder withSSTableVersion(Version version)
+        {
+            this.version = version;
+            return this;
+        }
+
+        public Builder withSSTableLevel(int sstableLevel)
+        {
+            this.sstableLevel = sstableLevel;
+            return this;
+        }
+
+        public Builder withEstimatedKeys(long estimatedKeys)
+        {
+            this.estimatedKeys = estimatedKeys;
+            return this;
+        }
+
+        public Builder 
withSections(List<SSTableReader.PartitionPositionBounds> sections)
+        {
+            this.sections = sections;
+            return this;
+        }
+
+        public Builder withCompressionMetadata(CompressionMetadata 
compressionMetadata)
+        {
+            this.compressionMetadata = compressionMetadata;
+            return this;
+        }
+
+        public Builder withCompressionInfo(CompressionInfo compressionInfo)
+        {
+            this.compressionInfo = compressionInfo;
+            return this;
+        }
+
+        public Builder withSerializationHeader(SerializationHeader.Component 
header)
+        {
+            this.serializationHeader = header;
+            return this;
+        }
+
+        public Builder withTableId(TableId tableId)
+        {
+            this.tableId = tableId;
+            return this;
+        }
+
+        public Builder isEntireSSTable(boolean isEntireSSTable)
+        {
+            this.isEntireSSTable = isEntireSSTable;
+            return this;
+        }
+
+        public Builder withComponentManifest(ComponentManifest 
componentManifest)
+        {
+            this.componentManifest = componentManifest;
+            return this;
+        }
+
+        public Builder withFirstKey(DecoratedKey firstKey)
+        {
+            this.firstKey = firstKey;
+            return this;
+        }
+
+        public CassandraStreamHeader build()
+        {
+            checkNotNull(version);
+            checkNotNull(format);
+            checkNotNull(sections);
+            checkNotNull(serializationHeader);
+            checkNotNull(tableId);
+
+            if (isEntireSSTable)
+            {
+                checkNotNull(componentManifest);
+                checkNotNull(firstKey);
+            }
+
+            return new CassandraStreamHeader(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index 673b62c..43667d0 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.streaming.StreamReceiver;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -152,7 +151,8 @@ public class CassandraStreamManager implements 
TableStreamManager
                     ref.release();
                     continue;
                 }
-                streams.add(new 
CassandraOutgoingFile(session.getStreamOperation(), ref, sections, 
sstable.estimatedKeysForRanges(ranges)));
+                streams.add(new 
CassandraOutgoingFile(session.getStreamOperation(), ref, sections, ranges,
+                                                      
sstable.estimatedKeysForRanges(ranges)));
             }
 
             return streams;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 3930196..fccabfe 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.FBUtilities;
 /**
  * CassandraStreamReader reads from stream and writes to SSTable.
  */
-public class CassandraStreamReader
+public class CassandraStreamReader implements IStreamReader
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReader.class);
     protected final TableId tableId;
@@ -85,7 +85,7 @@ public class CassandraStreamReader
         this.pendingRepair = header.pendingRepair;
         this.format = streamHeader.format;
         this.sstableLevel = streamHeader.sstableLevel;
-        this.header = streamHeader.header;
+        this.header = streamHeader.serializationHeader;
         this.fileSeqNum = header.sequenceNumber;
     }
 
@@ -95,6 +95,7 @@ public class CassandraStreamReader
      * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
      */
     @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
+    @Override
     public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
     {
         long totalSize = totalSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
new file mode 100644
index 0000000..90e3dbd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable<Component>
+{
+    private final LinkedHashMap<Component, Long> components;
+
+    public ComponentManifest(Map<Component, Long> components)
+    {
+        this.components = new LinkedHashMap<>(components);
+    }
+
+    public long sizeOf(Component component)
+    {
+        Long size = components.get(component);
+        if (size == null)
+            throw new IllegalArgumentException("Component " + component + " is 
not present in the manifest");
+        return size;
+    }
+
+    public long totalSize()
+    {
+        long totalSize = 0;
+        for (Long size : components.values())
+            totalSize += size;
+        return totalSize;
+    }
+
+    public List<Component> components()
+    {
+        return new ArrayList<>(components.keySet());
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ComponentManifest))
+            return false;
+
+        ComponentManifest that = (ComponentManifest) o;
+        return components.equals(that.components);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return components.hashCode();
+    }
+
+    public static final IVersionedSerializer<ComponentManifest> serializer = 
new IVersionedSerializer<ComponentManifest>()
+    {
+        public void serialize(ComponentManifest manifest, DataOutputPlus out, 
int version) throws IOException
+        {
+            out.writeUnsignedVInt(manifest.components.size());
+            for (Map.Entry<Component, Long> entry : 
manifest.components.entrySet())
+            {
+                out.writeUTF(entry.getKey().name);
+                out.writeUnsignedVInt(entry.getValue());
+            }
+        }
+
+        public ComponentManifest deserialize(DataInputPlus in, int version) 
throws IOException
+        {
+            int size = (int) in.readUnsignedVInt();
+
+            LinkedHashMap<Component, Long> components = new 
LinkedHashMap<>(size);
+
+            for (int i = 0; i < size; i++)
+            {
+                Component component = Component.parse(in.readUTF());
+                long length = in.readUnsignedVInt();
+                components.put(component, length);
+            }
+
+            return new ComponentManifest(components);
+        }
+
+        public long serializedSize(ComponentManifest manifest, int version)
+        {
+            long size = 
TypeSizes.sizeofUnsignedVInt(manifest.components.size());
+            for (Map.Entry<Component, Long> entry : 
manifest.components.entrySet())
+            {
+                size += TypeSizes.sizeof(entry.getKey().name);
+                size += TypeSizes.sizeofUnsignedVInt(entry.getValue());
+            }
+            return size;
+        }
+    };
+
+    @Override
+    public Iterator<Component> iterator()
+    {
+        return Iterators.unmodifiableIterator(components.keySet().iterator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
deleted file mode 100644
index c71edfb..0000000
--- 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.streaming;
-
-import java.io.IOException;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.StreamMessageHeader;
-import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.FBUtilities;
-
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
-/**
- * CassandraStreamReader that reads from streamed compressed SSTable
- */
-public class CompressedCassandraStreamReader extends CassandraStreamReader
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedCassandraStreamReader.class);
-
-    protected final CompressionInfo compressionInfo;
-
-    public CompressedCassandraStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
-    {
-        super(header, streamHeader, session);
-        this.compressionInfo = streamHeader.compressionInfo;
-    }
-
-    /**
-     * @return SSTable transferred
-     * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
-     */
-    @Override
-    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
-    {
-        long totalSize = totalSize();
-
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
-
-        if (cfs == null)
-        {
-            // schema was dropped during streaming
-            throw new IOException("CF " + tableId + " was dropped during 
streaming");
-        }
-
-        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
-                     cfs.getTableName());
-
-        StreamDeserializer deserializer = null;
-        SSTableMultiWriter writer = null;
-        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
-        {
-            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
-            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
-            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
-            String filename = writer.getFilename();
-            int sectionIdx = 0;
-            for (SSTableReader.PartitionPositionBounds section : sections)
-            {
-                assert cis.getTotalCompressedBytesRead() <= totalSize;
-                long sectionLength = section.upperPosition - 
section.lowerPosition;
-
-                logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
-                // skip to beginning of section inside chunk
-                cis.position(section.lowerPosition);
-                in.reset(0);
-
-                while (in.getBytesRead() < sectionLength)
-                {
-                    writePartition(deserializer, writer);
-                    // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
-                         session.peer, 
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
-            return writer;
-        }
-        catch (Throwable e)
-        {
-            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
-            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
-                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName());
-            if (writer != null)
-            {
-                writer.abort(e);
-            }
-            if (extractIOExceptionCause(e).isPresent())
-                throw e;
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to