Abstract streaming for pluggable storage

Patch by Blake Eggleston; Reviewed by Jason Brown for CASSANDRA-14115


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9714a7c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9714a7c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9714a7c8

Branch: refs/heads/trunk
Commit: 9714a7c817b64a3358f69e536535c756c5c6df48
Parents: 253c003
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Sun Mar 4 14:44:30 2018 -0800
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Mar 15 16:40:05 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |  10 +
 .../db/streaming/CassandraIncomingFile.java     | 117 +++++++
 .../db/streaming/CassandraOutgoingFile.java     | 151 +++++++++
 .../db/streaming/CassandraStreamHeader.java     | 212 +++++++++++++
 .../db/streaming/CassandraStreamManager.java    | 166 ++++++++++
 .../db/streaming/CassandraStreamReader.java     | 285 +++++++++++++++++
 .../db/streaming/CassandraStreamReceiver.java   | 248 +++++++++++++++
 .../db/streaming/CassandraStreamWriter.java     | 176 +++++++++++
 .../CompressedCassandraStreamReader.java        | 131 ++++++++
 .../CompressedCassandraStreamWriter.java        | 153 ++++++++++
 .../db/streaming/CompressedInputStream.java     | 285 +++++++++++++++++
 .../cassandra/db/streaming/CompressionInfo.java | 110 +++++++
 .../cassandra/db/streaming/package-info.java    |  53 ++++
 .../org/apache/cassandra/dht/RangeStreamer.java |   4 +-
 .../cassandra/io/sstable/SSTableLoader.java     |  18 +-
 .../repair/AsymmetricLocalSyncTask.java         |   1 -
 .../apache/cassandra/repair/LocalSyncTask.java  |   2 +-
 .../cassandra/repair/StreamingRepairTask.java   |   2 +-
 .../cassandra/streaming/IncomingStream.java     |  45 +++
 .../cassandra/streaming/OutgoingStream.java     |  52 ++++
 .../apache/cassandra/streaming/PreviewKind.java |  21 +-
 .../cassandra/streaming/ProgressInfo.java       |   4 +-
 .../apache/cassandra/streaming/SessionInfo.java |   8 +-
 .../cassandra/streaming/StreamCoordinator.java  |  40 ++-
 .../apache/cassandra/streaming/StreamHook.java  |  14 +-
 .../apache/cassandra/streaming/StreamPlan.java  |  20 +-
 .../cassandra/streaming/StreamReader.java       | 278 -----------------
 .../cassandra/streaming/StreamReceiveTask.java  | 211 ++-----------
 .../cassandra/streaming/StreamReceiver.java     |  58 ++++
 .../cassandra/streaming/StreamResultFuture.java |   7 +-
 .../cassandra/streaming/StreamSession.java      | 199 ++++--------
 .../cassandra/streaming/StreamTransferTask.java |  58 ++--
 .../cassandra/streaming/StreamWriter.java       | 173 -----------
 .../cassandra/streaming/TableStreamManager.java |  57 ++++
 .../async/NettyStreamingMessageSender.java      |  41 ++-
 .../async/StreamingInboundHandler.java          |  23 +-
 .../cassandra/streaming/async/package-info.java |  36 +--
 .../compress/CompressedInputStream.java         | 285 -----------------
 .../compress/CompressedStreamReader.java        | 132 --------
 .../compress/CompressedStreamWriter.java        | 154 ----------
 .../streaming/compress/CompressionInfo.java     |  95 ------
 .../management/ProgressInfoCompositeData.java   |   2 +-
 .../streaming/messages/FileMessageHeader.java   | 304 -------------------
 .../streaming/messages/IncomingFileMessage.java |  95 ------
 .../messages/IncomingStreamMessage.java         | 107 +++++++
 .../streaming/messages/OutgoingFileMessage.java | 158 ----------
 .../messages/OutgoingStreamMessage.java         | 138 +++++++++
 .../streaming/messages/StreamInitMessage.java   |   9 +-
 .../streaming/messages/StreamMessage.java       |   2 +-
 .../streaming/messages/StreamMessageHeader.java | 144 +++++++++
 .../db/streaming/CassandraStreamHeaderTest.java |  50 +++
 .../streaming/CassandraStreamManagerTest.java   | 237 +++++++++++++++
 .../cassandra/dht/StreamStateStoreTest.java     |   5 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |  14 +-
 .../io/sstable/SSTableRewriterTest.java         |  59 ----
 .../serializers/SerializationUtils.java         |  68 +++++
 .../cassandra/streaming/StreamSessionTest.java  | 132 --------
 .../streaming/StreamTransferTaskTest.java       |  19 +-
 .../streaming/StreamingTransferTest.java        |  27 +-
 .../async/NettyStreamingMessageSenderTest.java  |   5 +-
 .../async/StreamingInboundHandlerTest.java      |  23 +-
 .../compression/CompressedInputStreamTest.java  |   4 +-
 63 files changed, 3314 insertions(+), 2424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 42add69..9458f19 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Abstract streaming for pluggable storage (CASSANDRA-14115)
  * Forced incremental repairs should promote sstables if they can 
(CASSANDRA-14294)
  * Use Murmur3 for validation compactions (CASSANDRA-14002)
  * Comma at the end of the seed list is interpretated as localhost 
(CASSANDRA-14285)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 007bf2d..1be0083 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.compaction.*;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.streaming.CassandraStreamManager;
 import org.apache.cassandra.db.view.TableViews;
 import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.partitions.CachedPartition;
@@ -78,6 +79,7 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -213,6 +215,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     public volatile long sampleLatencyNanos;
     private final ScheduledFuture<?> latencyCalculator;
 
+    private final CassandraStreamManager streamManager;
+
     private volatile boolean compactionSpaceCheck = true;
 
     @VisibleForTesting
@@ -465,6 +469,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             mbeanName = null;
             oldMBeanName= null;
         }
+        streamManager = new CassandraStreamManager(this);
+    }
+
+    public TableStreamManager getStreamManager()
+    {
+        return streamManager;
     }
 
     public TableMetadata metadata()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
new file mode 100644
index 0000000..16698e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+/**
+ * used to receive the part(or whole) of a SSTable data file.
+ *
+ * This class deserializes the data stream into partitions and rows, and 
writes that out as an sstable
+ */
+public class CassandraIncomingFile implements IncomingStream
+{
+    private final ColumnFamilyStore cfs;
+    private final StreamSession session;
+    private final StreamMessageHeader header;
+
+    private volatile SSTableMultiWriter sstable;
+    private volatile long size = -1;
+
+    public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, 
StreamMessageHeader header)
+    {
+        this.cfs = cfs;
+        this.session = session;
+        this.header = header;
+    }
+
+    @Override
+    public synchronized void read(DataInputPlus in, int version) throws 
IOException
+    {
+        CassandraStreamHeader streamHeader = 
CassandraStreamHeader.serializer.deserialize(in, version);
+        CassandraStreamReader reader = !streamHeader.isCompressed()
+                                       ? new CassandraStreamReader(header, 
streamHeader, session)
+                                       : new 
CompressedCassandraStreamReader(header, streamHeader, session);
+        size = streamHeader.size();
+        sstable = reader.read(in);
+    }
+
+    @Override
+    public synchronized String getName()
+    {
+        return sstable == null ? "null" : sstable.getFilename();
+    }
+
+    @Override
+    public synchronized long getSize()
+    {
+        Preconditions.checkState(size > 0, "Stream hasn't been read yet");
+        return size;
+    }
+
+    @Override
+    public TableId getTableId()
+    {
+        Preconditions.checkState(sstable != null, "Stream hasn't been read 
yet");
+        return sstable.getTableId();
+    }
+
+    @Override
+    public String toString()
+    {
+        SSTableMultiWriter sst = sstable;
+        return "CassandraIncomingFile{" +
+               "sstable=" + (sst == null ? "null" : sst.getFilename()) +
+               '}';
+    }
+
+    public SSTableMultiWriter getSSTable()
+    {
+        Preconditions.checkState(sstable != null, "Stream hasn't been read 
yet");
+        return sstable;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CassandraIncomingFile that = (CassandraIncomingFile) o;
+        return Objects.equals(cfs, that.cfs) &&
+               Objects.equals(session, that.session) &&
+               Objects.equals(header, that.header) &&
+               Objects.equals(sstable, that.sstable);
+    }
+
+    public int hashCode()
+    {
+
+        return Objects.hash(cfs, session, header, sstable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
new file mode 100644
index 0000000..3fd3f9d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+/**
+ * used to transfer the part(or whole) of a SSTable data file
+ */
+public class CassandraOutgoingFile implements OutgoingStream
+{
+    private final Ref<SSTableReader> ref;
+    private final long estimatedKeys;
+    private final List<Pair<Long, Long>> sections;
+    private final String filename;
+    private final CassandraStreamHeader header;
+    private final boolean keepSSTableLevel;
+
+    public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> 
ref, List<Pair<Long, Long>> sections, long estimatedKeys)
+    {
+        Preconditions.checkNotNull(ref.get());
+        this.ref = ref;
+        this.estimatedKeys = estimatedKeys;
+        this.sections = sections;
+        this.filename = ref.get().getFilename();
+
+        SSTableReader sstable = ref.get();
+        keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation 
== StreamOperation.REBUILD;
+        this.header = new CassandraStreamHeader(sstable.descriptor.version,
+                                                sstable.descriptor.formatType,
+                                                estimatedKeys,
+                                                sections,
+                                                sstable.compression ? 
sstable.getCompressionMetadata() : null,
+                                                keepSSTableLevel ? 
sstable.getSSTableLevel() : 0,
+                                                sstable.header.toComponent());
+    }
+
+    public static CassandraOutgoingFile fromStream(OutgoingStream stream)
+    {
+        Preconditions.checkArgument(stream instanceof CassandraOutgoingFile);
+        return (CassandraOutgoingFile) stream;
+    }
+
+    @VisibleForTesting
+    public Ref<SSTableReader> getRef()
+    {
+        return ref;
+    }
+
+    @Override
+    public String getName()
+    {
+        return filename;
+    }
+
+    @Override
+    public long getSize()
+    {
+        return header.size();
+    }
+
+    @Override
+    public TableId getTableId()
+    {
+        return ref.get().metadata().id;
+    }
+
+    @Override
+    public long getRepairedAt()
+    {
+        return ref.get().getRepairedAt();
+    }
+
+    @Override
+    public UUID getPendingRepair()
+    {
+        return ref.get().getPendingRepair();
+    }
+
+    @Override
+    public void write(StreamSession session, DataOutputStreamPlus out, int 
version) throws IOException
+    {
+        SSTableReader sstable = ref.get();
+        CassandraStreamHeader.serializer.serialize(header, out, version);
+        out.flush();
+
+        CassandraStreamWriter writer = header.compressionInfo == null ?
+                                       new CassandraStreamWriter(sstable, 
header.sections, session) :
+                                       new 
CompressedCassandraStreamWriter(sstable, header.sections,
+                                                                           
header.compressionInfo, session);
+        writer.write(out);
+    }
+
+    @Override
+    public void finish()
+    {
+        ref.release();
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CassandraOutgoingFile that = (CassandraOutgoingFile) o;
+        return estimatedKeys == that.estimatedKeys &&
+               Objects.equals(ref, that.ref) &&
+               Objects.equals(sections, that.sections);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(ref, estimatedKeys, sections);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CassandraOutgoingFile{" + ref.get().getFilename() + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
new file mode 100644
index 0000000..2603da1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.Pair;
+
+public class CassandraStreamHeader
+{
+    /** SSTable version */
+    public final Version version;
+
+    /** SSTable format **/
+    public final SSTableFormat.Type format;
+    public final long estimatedKeys;
+    public final List<Pair<Long, Long>> sections;
+    /**
+     * Compression info for SSTable to send. Can be null if SSTable is not 
compressed.
+     * On sender, this field is always null to avoid holding large number of 
Chunks.
+     * Use compressionMetadata instead.
+     */
+    private final CompressionMetadata compressionMetadata;
+    public volatile CompressionInfo compressionInfo;
+    public final int sstableLevel;
+    public final SerializationHeader.Component header;
+
+    /* cached size value */
+    private transient final long size;
+
+    private CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata 
compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, 
SerializationHeader.Component header)
+    {
+        this.version = version;
+        this.format = format;
+        this.estimatedKeys = estimatedKeys;
+        this.sections = sections;
+        this.compressionMetadata = compressionMetadata;
+        this.compressionInfo = compressionInfo;
+        this.sstableLevel = sstableLevel;
+        this.header = header;
+
+        this.size = calculateSize();
+    }
+
+    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionMetadata 
compressionMetadata, int sstableLevel, SerializationHeader.Component header)
+    {
+        this(version, format, estimatedKeys, sections, compressionMetadata, 
null, sstableLevel, header);
+    }
+
+    public CassandraStreamHeader(Version version, SSTableFormat.Type format, 
long estimatedKeys, List<Pair<Long, Long>> sections, CompressionInfo 
compressionInfo, int sstableLevel, SerializationHeader.Component header)
+    {
+        this(version, format, estimatedKeys, sections, null, compressionInfo, 
sstableLevel, header);
+    }
+
+    public boolean isCompressed()
+    {
+        return compressionInfo != null;
+    }
+
+    /**
+     * @return total file size to transfer in bytes
+     */
+    public long size()
+    {
+        return size;
+    }
+
+    private long calculateSize()
+    {
+        long transferSize = 0;
+        if (compressionInfo != null)
+        {
+            // calculate total length of transferring chunks
+            for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+                transferSize += chunk.length + 4; // 4 bytes for CRC
+        }
+        else
+        {
+            for (Pair<Long, Long> section : sections)
+                transferSize += section.right - section.left;
+        }
+        return transferSize;
+    }
+
+    public synchronized void calculateCompressionInfo()
+    {
+        if (compressionMetadata != null && compressionInfo == null)
+        {
+            compressionInfo = 
CompressionInfo.fromCompressionMetadata(compressionMetadata, sections);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CassandraStreamHeader{" +
+               "version=" + version +
+               ", format=" + format +
+               ", estimatedKeys=" + estimatedKeys +
+               ", sections=" + sections +
+               ", compressionInfo=" + compressionInfo +
+               ", sstableLevel=" + sstableLevel +
+               ", header=" + header +
+               '}';
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        CassandraStreamHeader that = (CassandraStreamHeader) o;
+        return estimatedKeys == that.estimatedKeys &&
+               sstableLevel == that.sstableLevel &&
+               Objects.equals(version, that.version) &&
+               format == that.format &&
+               Objects.equals(sections, that.sections) &&
+               Objects.equals(compressionInfo, that.compressionInfo) &&
+               Objects.equals(header, that.header);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(version, format, estimatedKeys, sections, 
compressionInfo, sstableLevel, header);
+    }
+
+
+    public static final IVersionedSerializer<CassandraStreamHeader> serializer 
= new IVersionedSerializer<CassandraStreamHeader>()
+    {
+        public void serialize(CassandraStreamHeader header, DataOutputPlus 
out, int version) throws IOException
+        {
+            out.writeUTF(header.version.toString());
+            out.writeUTF(header.format.name);
+
+            out.writeLong(header.estimatedKeys);
+            out.writeInt(header.sections.size());
+            for (Pair<Long, Long> section : header.sections)
+            {
+                out.writeLong(section.left);
+                out.writeLong(section.right);
+            }
+            header.calculateCompressionInfo();
+            CompressionInfo.serializer.serialize(header.compressionInfo, out, 
version);
+            out.writeInt(header.sstableLevel);
+            SerializationHeader.serializer.serialize(header.version, 
header.header, out);
+        }
+
+        public CassandraStreamHeader deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            Version sstableVersion = 
SSTableFormat.Type.current().info.getVersion(in.readUTF());
+            SSTableFormat.Type format = 
SSTableFormat.Type.validate(in.readUTF());
+
+            long estimatedKeys = in.readLong();
+            int count = in.readInt();
+            List<Pair<Long, Long>> sections = new ArrayList<>(count);
+            for (int k = 0; k < count; k++)
+                sections.add(Pair.create(in.readLong(), in.readLong()));
+            CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
+            int sstableLevel = in.readInt();
+            SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
+
+            return new CassandraStreamHeader(sstableVersion, format, 
estimatedKeys, sections, compressionInfo, sstableLevel, header);
+        }
+
+        public long serializedSize(CassandraStreamHeader header, int version)
+        {
+            long size = 0;
+            size += TypeSizes.sizeof(header.version.toString());
+            size += TypeSizes.sizeof(header.format.name);
+            size += TypeSizes.sizeof(header.estimatedKeys);
+
+            size += TypeSizes.sizeof(header.sections.size());
+            for (Pair<Long, Long> section : header.sections)
+            {
+                size += TypeSizes.sizeof(section.left);
+                size += TypeSizes.sizeof(section.right);
+            }
+            size += 
CompressionInfo.serializer.serializedSize(header.compressionInfo, version);
+            size += TypeSizes.sizeof(header.sstableLevel);
+
+            size += 
SerializationHeader.serializer.serializedSize(header.version, header.header);
+
+            return size;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
new file mode 100644
index 0000000..466fa36
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.OutgoingStream;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.TableStreamManager;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Implements the streaming interface for the native cassandra storage engine.
+ *
+ * Handles the streaming a one or more section of one of more sstables to and 
from a specific
+ * remote node. The sending side performs a block-level transfer of the source 
stream, while the receiver
+ * must deserilaize that data stream into an partitions and rows, and then 
write that out as an sstable.
+ */
+public class CassandraStreamManager implements TableStreamManager
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamManager.class);
+
+    private final ColumnFamilyStore cfs;
+
+    public CassandraStreamManager(ColumnFamilyStore cfs)
+    {
+        this.cfs = cfs;
+    }
+
+    @Override
+    public IncomingStream prepareIncomingStream(StreamSession session, 
StreamMessageHeader header)
+    {
+        return new CassandraIncomingFile(cfs, session, header);
+    }
+
+    @Override
+    public StreamReceiver createStreamReceiver(StreamSession session, int 
totalStreams)
+    {
+        return new CassandraStreamReceiver(cfs, session, totalStreams);
+    }
+
+    private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind 
kind)
+    {
+        switch (kind)
+        {
+            case ALL:
+                return Predicates.alwaysTrue();
+            case UNREPAIRED:
+                return Predicates.not(SSTableReader::isRepaired);
+            case REPAIRED:
+                return SSTableReader::isRepaired;
+            default:
+                throw new IllegalArgumentException("Unsupported kind: " + 
kind);
+        }
+    }
+
+    @Override
+    public Collection<OutgoingStream> createOutgoingStreams(StreamSession 
session, Collection<Range<Token>> ranges, UUID pendingRepair, PreviewKind 
previewKind)
+    {
+        Refs<SSTableReader> refs = new Refs<>();
+        try
+        {
+            final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(ranges.size());
+            for (Range<Token> range : ranges)
+                keyRanges.add(Range.makeRowRange(range));
+            refs.addAll(cfs.selectAndReference(view -> {
+                Set<SSTableReader> sstables = Sets.newHashSet();
+                SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
+                Predicate<SSTableReader> predicate;
+                if (previewKind.isPreview())
+                {
+                    predicate = getPreviewPredicate(previewKind);
+                }
+                else if (pendingRepair == 
ActiveRepairService.NO_PENDING_REPAIR)
+                {
+                    predicate = Predicates.alwaysTrue();
+                }
+                else
+                {
+                    predicate = s -> s.isPendingRepair() && 
s.getSSTableMetadata().pendingRepair.equals(pendingRepair);
+                }
+
+                for (Range<PartitionPosition> keyRange : keyRanges)
+                {
+                    // keyRange excludes its start, while sstableInBounds is 
inclusive (of both start and end).
+                    // This is fine however, because keyRange has been created 
from a token range through Range.makeRowRange (see above).
+                    // And that later method uses the Token.maxKeyBound() 
method to creates the range, which return a "fake" key that
+                    // sort after all keys having the token. That "fake" key 
cannot however be equal to any real key, so that even
+                    // including keyRange.left will still exclude any key 
having the token of the original token range, and so we're
+                    // still actually selecting what we wanted.
+                    for (SSTableReader sstable : 
Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, 
intervalTree), predicate))
+                    {
+                        sstables.add(sstable);
+                    }
+                }
+
+                if (logger.isDebugEnabled())
+                    logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL)));
+                return sstables;
+            }).refs);
+
+
+            List<OutgoingStream> streams = new ArrayList<>(refs.size());
+            for (SSTableReader sstable: refs)
+            {
+                Ref<SSTableReader> ref = refs.get(sstable);
+                List<Pair<Long, Long>> sections = 
sstable.getPositionsForRanges(ranges);
+                if (sections.isEmpty())
+                {
+                    ref.release();
+                    continue;
+                }
+                streams.add(new 
CassandraOutgoingFile(session.getStreamOperation(), ref, sections, 
sstable.estimatedKeysForRanges(ranges)));
+            }
+
+            return streams;
+        }
+        catch (Throwable t)
+        {
+            refs.release();
+            throw t;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
new file mode 100644
index 0000000..26ef5ed
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.*;
+import java.util.Collection;
+import java.util.UUID;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.compress.StreamCompressionInputStream;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.streaming.messages.StreamMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * CassandraStreamReader reads from stream and writes to SSTable.
+ */
+public class CassandraStreamReader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReader.class);
+    protected final TableId tableId;
+    protected final long estimatedKeys;
+    protected final Collection<Pair<Long, Long>> sections;
+    protected final StreamSession session;
+    protected final Version inputVersion;
+    protected final long repairedAt;
+    protected final UUID pendingRepair;
+    protected final SSTableFormat.Type format;
+    protected final int sstableLevel;
+    protected final SerializationHeader.Component header;
+    protected final int fileSeqNum;
+
+    public CassandraStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        if (session.getPendingRepair() != null)
+        {
+            // we should only ever be streaming pending repair
+            // sstables if the session has a pending repair id
+            assert session.getPendingRepair().equals(header.pendingRepair);
+        }
+        this.session = session;
+        this.tableId = header.tableId;
+        this.estimatedKeys = streamHeader.estimatedKeys;
+        this.sections = streamHeader.sections;
+        this.inputVersion = streamHeader.version;
+        this.repairedAt = header.repairedAt;
+        this.pendingRepair = header.pendingRepair;
+        this.format = streamHeader.format;
+        this.sstableLevel = streamHeader.sstableLevel;
+        this.header = streamHeader.header;
+        this.fileSeqNum = header.sequenceNumber;
+    }
+
+    /**
+     * @param inputPlus where this reads data from
+     * @return SSTable transferred
+     * @throws IOException if reading the remote sstable fails. Will throw an 
RTE if local write fails.
+     */
+    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    {
+        long totalSize = totalSize();
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("CF " + tableId + " was dropped during 
streaming");
+        }
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', table = '{}', pendingRepair = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(),
+                     cfs.getTableName(), pendingRepair);
+
+        StreamDeserializer deserializer = null;
+        SSTableMultiWriter writer = null;
+        try (StreamCompressionInputStream streamCompressionInputStream = new 
StreamCompressionInputStream(inputPlus, StreamMessage.CURRENT_VERSION))
+        {
+            TrackedDataInputPlus in = new 
TrackedDataInputPlus(streamCompressionInputStream);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
+            while (in.getBytesRead() < totalSize)
+            {
+                writePartition(deserializer, writer);
+                // TODO move this to BytesReadTracker
+                session.progress(writer.getFilename(), 
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+            }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
+                         session.planId(), fileSeqNum, session.peer, 
FBUtilities.prettyPrintMemory(in.getBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
+            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
+                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName(), e);
+            if (writer != null)
+            {
+                writer.abort(e);
+            }
+            throw Throwables.propagate(e);
+        }
+    }
+
+    protected SerializationHeader getHeader(TableMetadata metadata)
+    {
+        return header != null? header.toHeader(metadata) : null; //pre-3.0 
sstable have no SerializationHeader
+    }
+
+    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long 
totalSize, long repairedAt, UUID pendingRepair, SSTableFormat.Type format) 
throws IOException
+    {
+        Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
+        if (localDir == null)
+            throw new IOException(String.format("Insufficient disk space to 
store %s", FBUtilities.prettyPrintMemory(totalSize)));
+
+        StreamReceiver streamReceiver = session.getAggregator(tableId);
+        Preconditions.checkState(streamReceiver instanceof 
CassandraStreamReceiver);
+        LifecycleTransaction txn = 
CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction();
+
+        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, pendingRepair, format, sstableLevel, totalSize, txn, 
getHeader(cfs.metadata()));
+        return writer;
+    }
+
+    protected long totalSize()
+    {
+        long size = 0;
+        for (Pair<Long, Long> section : sections)
+            size += section.right - section.left;
+        return size;
+    }
+
+    protected void writePartition(StreamDeserializer deserializer, 
SSTableMultiWriter writer) throws IOException
+    {
+        writer.append(deserializer.newPartition());
+        deserializer.checkForExceptions();
+    }
+
+    public static class StreamDeserializer extends 
UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator
+    {
+        private final TableMetadata metadata;
+        private final DataInputPlus in;
+        private final SerializationHeader header;
+        private final SerializationHelper helper;
+
+        private DecoratedKey key;
+        private DeletionTime partitionLevelDeletion;
+        private SSTableSimpleIterator iterator;
+        private Row staticRow;
+        private IOException exception;
+
+        public StreamDeserializer(TableMetadata metadata, DataInputPlus in, 
Version version, SerializationHeader header) throws IOException
+        {
+            this.metadata = metadata;
+            this.in = in;
+            this.helper = new SerializationHelper(metadata, 
version.correspondingMessagingVersion(), 
SerializationHelper.Flag.PRESERVE_SIZE);
+            this.header = header;
+        }
+
+        public StreamDeserializer newPartition() throws IOException
+        {
+            key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in));
+            partitionLevelDeletion = DeletionTime.serializer.deserialize(in);
+            iterator = SSTableSimpleIterator.create(metadata, in, header, 
helper, partitionLevelDeletion);
+            staticRow = iterator.readStaticRow();
+            return this;
+        }
+
+        public TableMetadata metadata()
+        {
+            return metadata;
+        }
+
+        public RegularAndStaticColumns columns()
+        {
+            // We don't know which columns we'll get so assume it can be all 
of them
+            return metadata.regularAndStaticColumns();
+        }
+
+        public boolean isReverseOrder()
+        {
+            return false;
+        }
+
+        public DecoratedKey partitionKey()
+        {
+            return key;
+        }
+
+        public DeletionTime partitionLevelDeletion()
+        {
+            return partitionLevelDeletion;
+        }
+
+        public Row staticRow()
+        {
+            return staticRow;
+        }
+
+        public EncodingStats stats()
+        {
+            return header.stats();
+        }
+
+        public boolean hasNext()
+        {
+            try
+            {
+                return iterator.hasNext();
+            }
+            catch (IOError e)
+            {
+                if (e.getCause() != null && e.getCause() instanceof 
IOException)
+                {
+                    exception = (IOException)e.getCause();
+                    return false;
+                }
+                throw e;
+            }
+        }
+
+        public Unfiltered next()
+        {
+            // Note that in practice we know that IOException will be thrown 
by hasNext(), because that's
+            // where the actual reading happens, so we don't bother catching 
RuntimeException here (contrarily
+            // to what we do in hasNext)
+            Unfiltered unfiltered = iterator.next();
+            return metadata.isCounter() && unfiltered.kind() == 
Unfiltered.Kind.ROW
+                   ? maybeMarkLocalToBeCleared((Row) unfiltered)
+                   : unfiltered;
+        }
+
+        private Row maybeMarkLocalToBeCleared(Row row)
+        {
+            return metadata.isCounter() ? row.markCounterLocalToBeCleared() : 
row;
+        }
+
+        public void checkForExceptions() throws IOException
+        {
+            if (exception != null)
+                throw exception;
+        }
+
+        public void close()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
new file mode 100644
index 0000000..6a57e49
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.View;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.IncomingStream;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+public class CassandraStreamReceiver implements StreamReceiver
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamReceiver.class);
+
+    private static final int MAX_ROWS_PER_BATCH = 
Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch", 100);
+
+    private final ColumnFamilyStore cfs;
+    private final StreamSession session;
+
+    // Transaction tracking new files received
+    private final LifecycleTransaction txn;
+
+    //  holds references to SSTables received
+    protected Collection<SSTableReader> sstables;
+
+    private final boolean requiresWritePath;
+
+
+    public CassandraStreamReceiver(ColumnFamilyStore cfs, StreamSession 
session, int totalFiles)
+    {
+        this.cfs = cfs;
+        this.session = session;
+        // this is an "offline" transaction, as we currently manually expose 
the sstables once done;
+        // this should be revisited at a later date, so that 
LifecycleTransaction manages all sstable state changes
+        this.txn = LifecycleTransaction.offline(OperationType.STREAM);
+        this.sstables = new ArrayList<>(totalFiles);
+        this.requiresWritePath = requiresWritePath(cfs);
+    }
+
+    public LifecycleTransaction getTransaction()
+    {
+        return txn;
+    }
+
+    public static CassandraStreamReceiver fromReceiver(StreamReceiver receiver)
+    {
+        Preconditions.checkArgument(receiver instanceof 
CassandraStreamReceiver);
+        return (CassandraStreamReceiver) receiver;
+    }
+
+    private static CassandraIncomingFile getFile(IncomingStream stream)
+    {
+        Preconditions.checkArgument(stream instanceof CassandraIncomingFile, 
"Wrong stream type: {}", stream);
+        return (CassandraIncomingFile) stream;
+    }
+
+    @Override
+    public void received(IncomingStream stream)
+    {
+        CassandraIncomingFile file = getFile(stream);
+
+        Collection<SSTableReader> finished = null;
+        SSTableMultiWriter sstable = file.getSSTable();
+        try
+        {
+            finished = sstable.finish(true);
+        }
+        catch (Throwable t)
+        {
+            Throwables.maybeFail(sstable.abort(t));
+        }
+        txn.update(finished, false);
+        sstables.addAll(finished);
+    }
+
+    @Override
+    public void discardStream(IncomingStream stream)
+    {
+        CassandraIncomingFile file = getFile(stream);
+        Throwables.maybeFail(file.getSSTable().abort(null));
+    }
+
+    @Override
+    public void abort()
+    {
+        sstables.clear();
+        txn.abort();
+    }
+
+    private boolean hasViews(ColumnFamilyStore cfs)
+    {
+        return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, 
cfs.getTableName()));
+    }
+
+    private boolean hasCDC(ColumnFamilyStore cfs)
+    {
+        return cfs.metadata().params.cdc;
+    }
+
+    /*
+     * We have a special path for views and for CDC.
+     *
+     * For views, since the view requires cleaning up any pre-existing state, 
we must put all partitions
+     * through the same write path as normal mutations. This also ensures any 
2is are also updated.
+     *
+     * For CDC-enabled tables, we want to ensure that the mutations are run 
through the CommitLog so they
+     * can be archived by the CDC process on discard.
+     */
+    private boolean requiresWritePath(ColumnFamilyStore cfs) {
+        return hasCDC(cfs) || (session.streamOperation().requiresViewBuild() 
&& hasViews(cfs));
+    }
+
+    private void sendThroughWritePath(ColumnFamilyStore cfs, 
Collection<SSTableReader> readers) {
+        boolean hasCdc = hasCDC(cfs);
+        ColumnFilter filter = ColumnFilter.all(cfs.metadata());
+        for (SSTableReader reader : readers)
+        {
+            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
+            // When doing mutation-based repair we split each partition into 
smaller batches
+            // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and 
generating heap pressure
+            try (ISSTableScanner scanner = reader.getScanner();
+                 CloseableIterator<UnfilteredRowIterator> throttledPartitions 
= ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH))
+            {
+                while (throttledPartitions.hasNext())
+                {
+                    // MV *can* be applied unsafe if there's no CDC on the CFS 
as we flush
+                    // before transaction is done.
+                    //
+                    // If the CFS has CDC, however, these updates need to be 
written to the CommitLog
+                    // so they get archived into the cdc_raw folder
+                    ks.apply(new 
Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(), filter)),
+                             hasCdc,
+                             true,
+                             false);
+                }
+            }
+        }
+    }
+
+    private synchronized void finishTransaction()
+    {
+        txn.finish();
+    }
+
+    @Override
+    public void finished()
+    {
+        boolean requiresWritePath = requiresWritePath(cfs);
+        Collection<SSTableReader> readers = sstables;
+
+        try (Refs<SSTableReader> refs = Refs.ref(readers))
+        {
+            if (requiresWritePath)
+            {
+                sendThroughWritePath(cfs, readers);
+            }
+            else
+            {
+                finishTransaction();
+
+                // add sstables (this will build secondary indexes too, see 
CASSANDRA-10130)
+                logger.debug("[Stream #{}] Received {} sstables from {} ({})", 
session.planId(), readers.size(), session.peer, readers);
+                cfs.addSSTables(readers);
+
+                //invalidate row and counter cache
+                if (cfs.isRowCacheEnabled() || cfs.metadata().isCounter())
+                {
+                    List<Bounds<Token>> boundsToInvalidate = new 
ArrayList<>(readers.size());
+                    readers.forEach(sstable -> boundsToInvalidate.add(new 
Bounds<Token>(sstable.first.getToken(), sstable.last.getToken())));
+                    Set<Bounds<Token>> nonOverlappingBounds = 
Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                    if (cfs.isRowCacheEnabled())
+                    {
+                        int invalidatedKeys = 
cfs.invalidateRowCache(nonOverlappingBounds);
+                        if (invalidatedKeys > 0)
+                            logger.debug("[Stream #{}] Invalidated {} row 
cache entries on table {}.{} after stream " +
+                                         "receive task completed.", 
session.planId(), invalidatedKeys,
+                                         cfs.keyspace.getName(), 
cfs.getTableName());
+                    }
+
+                    if (cfs.metadata().isCounter())
+                    {
+                        int invalidatedKeys = 
cfs.invalidateCounterCache(nonOverlappingBounds);
+                        if (invalidatedKeys > 0)
+                            logger.debug("[Stream #{}] Invalidated {} counter 
cache entries on table {}.{} after stream " +
+                                         "receive task completed.", 
session.planId(), invalidatedKeys,
+                                         cfs.keyspace.getName(), 
cfs.getTableName());
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void cleanup()
+    {
+        // We don't keep the streamed sstables since we've applied them 
manually so we abort the txn and delete
+        // the streamed sstables.
+        if (requiresWritePath)
+        {
+            cfs.forceBlockingFlush();
+            abort();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
new file mode 100644
index 0000000..b86f99a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataIntegrityMetadata;
+import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import org.apache.cassandra.streaming.StreamSession;
+import 
org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * CassandraStreamWriter writes given section of the SSTable to given channel.
+ */
+public class CassandraStreamWriter
+{
+    private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CassandraStreamWriter.class);
+
+    protected final SSTableReader sstable;
+    protected final Collection<Pair<Long, Long>> sections;
+    protected final StreamRateLimiter limiter;
+    protected final StreamSession session;
+
+    public CassandraStreamWriter(SSTableReader sstable, Collection<Pair<Long, 
Long>> sections, StreamSession session)
+    {
+        this.session = session;
+        this.sstable = sstable;
+        this.sections = sections;
+        this.limiter =  StreamManager.getRateLimiter(session.peer);
+    }
+
+    /**
+     * Stream file of specified sections to given channel.
+     *
+     * CassandraStreamWriter uses LZF compression on wire to decrease size to 
transfer.
+     *
+     * @param output where this writes data to
+     * @throws IOException on any I/O error
+     */
+    public void write(DataOutputStreamPlus output) throws IOException
+    {
+        long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+        try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
+            ChecksumValidator validator = new 
File(sstable.descriptor.filenameFor(Component.CRC)).exists()
+                                          ? 
DataIntegrityMetadata.checksumValidator(sstable.descriptor)
+                                          : null)
+        {
+            int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: 
validator.chunkSize;
+
+            // setting up data compression stream
+            long progress = 0L;
+
+            try (DataOutputStreamPlus compressedOutput = new 
ByteBufCompressionDataOutputStreamPlus(output, limiter))
+            {
+                // stream each of the required sections of the file
+                for (Pair<Long, Long> section : sections)
+                {
+                    long start = validator == null ? section.left : 
validator.chunkStart(section.left);
+                    // if the transfer does not start on the valididator's 
chunk boundary, this is the number of bytes to offset by
+                    int transferOffset = (int) (section.left - start);
+                    if (validator != null)
+                        validator.seek(start);
+
+                    // length of the section to read
+                    long length = section.right - start;
+                    // tracks write progress
+                    long bytesRead = 0;
+                    while (bytesRead < length)
+                    {
+                        int toTransfer = (int) Math.min(bufferSize, length - 
bytesRead);
+                        long lastBytesRead = write(proxy, validator, 
compressedOutput, start, transferOffset, toTransfer, bufferSize);
+                        start += lastBytesRead;
+                        bytesRead += lastBytesRead;
+                        progress += (lastBytesRead - transferOffset);
+                        
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                        transferOffset = 0;
+                    }
+
+                    // make sure that current section is sent
+                    output.flush();
+                }
+                logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
+                             session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
+            }
+        }
+    }
+
+    protected long totalSize()
+    {
+        long size = 0;
+        for (Pair<Long, Long> section : sections)
+            size += section.right - section.left;
+        return size;
+    }
+
+    /**
+     * Sequentially read bytes from the file and write them to the output 
stream
+     *
+     * @param proxy The file reader to read from
+     * @param validator validator to verify data integrity
+     * @param start The readd offset from the beginning of the {@code proxy} 
file.
+     * @param transferOffset number of bytes to skip transfer, but include for 
validation.
+     * @param toTransfer The number of bytes to be transferred.
+     *
+     * @return Number of bytes transferred.
+     *
+     * @throws java.io.IOException on any I/O error
+     */
+    protected long write(ChannelProxy proxy, ChecksumValidator validator, 
DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, 
int bufferSize) throws IOException
+    {
+        // the count of bytes to read off disk
+        int minReadable = (int) Math.min(bufferSize, proxy.size() - start);
+
+        // this buffer will hold the data from disk. as it will be compressed 
on the fly by
+        // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can 
release this buffer as soon as we can.
+        ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable);
+        try
+        {
+            int readCount = proxy.read(buffer, start);
+            assert readCount == minReadable : String.format("could not read 
required number of bytes from file to be streamed: read %d bytes, wanted %d 
bytes", readCount, minReadable);
+            buffer.flip();
+
+            if (validator != null)
+            {
+                validator.validate(buffer);
+                buffer.flip();
+            }
+
+            buffer.position(transferOffset);
+            buffer.limit(transferOffset + (toTransfer - transferOffset));
+            output.write(buffer);
+        }
+        finally
+        {
+            FileUtils.clean(buffer);
+        }
+
+        return toTransfer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
new file mode 100644
index 0000000..343d7ed
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.TrackedDataInputPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.ChecksumType;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+
+/**
+ * CassandraStreamReader that reads from streamed compressed SSTable
+ */
+public class CompressedCassandraStreamReader extends CassandraStreamReader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CompressedCassandraStreamReader.class);
+
+    protected final CompressionInfo compressionInfo;
+
+    public CompressedCassandraStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+    {
+        super(header, streamHeader, session);
+        this.compressionInfo = streamHeader.compressionInfo;
+    }
+
+    /**
+     * @return SSTable transferred
+     * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
+     */
+    @Override
+    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
+    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
+    {
+        long totalSize = totalSize();
+
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+        if (cfs == null)
+        {
+            // schema was dropped during streaming
+            throw new IOException("CF " + tableId + " was dropped during 
streaming");
+        }
+
+        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
+                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
+                     cfs.getTableName());
+
+        StreamDeserializer deserializer = null;
+        SSTableMultiWriter writer = null;
+        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
+        {
+            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
+            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
+            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
+            String filename = writer.getFilename();
+            int sectionIdx = 0;
+            for (Pair<Long, Long> section : sections)
+            {
+                assert cis.getTotalCompressedBytesRead() <= totalSize;
+                long sectionLength = section.right - section.left;
+
+                logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
+                // skip to beginning of section inside chunk
+                cis.position(section.left);
+                in.reset(0);
+
+                while (in.getBytesRead() < sectionLength)
+                {
+                    writePartition(deserializer, writer);
+                    // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
+                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                         session.peer, 
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
+            return writer;
+        }
+        catch (Throwable e)
+        {
+            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
+            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
+                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName());
+            if (writer != null)
+            {
+                writer.abort(e);
+            }
+            if (extractIOExceptionCause(e).isPresent())
+                throw e;
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
new file mode 100644
index 0000000..3fcbc38
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamWriter.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * CassandraStreamWriter for compressed SSTable.
+ */
+public class CompressedCassandraStreamWriter extends CassandraStreamWriter
+{
+    private static final int CHUNK_SIZE = 1 << 16;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(CompressedCassandraStreamWriter.class);
+
+    private final CompressionInfo compressionInfo;
+
+    public CompressedCassandraStreamWriter(SSTableReader sstable, 
Collection<Pair<Long, Long>> sections, CompressionInfo compressionInfo, 
StreamSession session)
+    {
+        super(sstable, sections, session);
+        this.compressionInfo = compressionInfo;
+    }
+
+    @Override
+    public void write(DataOutputStreamPlus out) throws IOException
+    {
+        assert out instanceof ByteBufDataOutputStreamPlus;
+        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
+        long totalSize = totalSize();
+        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
+                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
+        {
+            long progress = 0L;
+            // calculate chunks to transfer. we want to send continuous chunks 
altogether.
+            List<Pair<Long, Long>> sections = 
getTransferSections(compressionInfo.chunks);
+
+            int sectionIdx = 0;
+
+            // stream each of the required sections of the file
+            for (final Pair<Long, Long> section : sections)
+            {
+                // length of the section to stream
+                long length = section.right - section.left;
+
+                logger.trace("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
+
+                // tracks write progress
+                long bytesTransferred = 0;
+                while (bytesTransferred < length)
+                {
+                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - 
bytesTransferred);
+                    limiter.acquire(toTransfer);
+
+                    ByteBuffer outBuffer = 
ByteBuffer.allocateDirect(toTransfer);
+                    long lastWrite;
+                    try
+                    {
+                        lastWrite = fc.read(outBuffer, section.left + 
bytesTransferred);
+                        assert lastWrite == toTransfer : String.format("could 
not read required number of bytes from file to be streamed: read %d bytes, 
wanted %d bytes", lastWrite, toTransfer);
+                        outBuffer.flip();
+                        output.writeToChannel(outBuffer);
+                    }
+                    catch (IOException e)
+                    {
+                        FileUtils.clean(outBuffer);
+                        throw e;
+                    }
+
+                    bytesTransferred += lastWrite;
+                    progress += lastWrite;
+                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
+                }
+            }
+            logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
+                         session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
+        }
+    }
+
+    @Override
+    protected long totalSize()
+    {
+        long size = 0;
+        // calculate total length of transferring chunks
+        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
+            size += chunk.length + 4; // 4 bytes for CRC
+        return size;
+    }
+
+    // chunks are assumed to be sorted by offset
+    private List<Pair<Long, Long>> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
+    {
+        List<Pair<Long, Long>> transferSections = new ArrayList<>();
+        Pair<Long, Long> lastSection = null;
+        for (CompressionMetadata.Chunk chunk : chunks)
+        {
+            if (lastSection != null)
+            {
+                if (chunk.offset == lastSection.right)
+                {
+                    // extend previous section to end of this chunk
+                    lastSection = Pair.create(lastSection.left, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
+                }
+                else
+                {
+                    transferSections.add(lastSection);
+                    lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
+                }
+            }
+            else
+            {
+                lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
+            }
+        }
+        if (lastSection != null)
+            transferSections.add(lastSection);
+        return transferSections;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to