http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 4085c43..adf5d76 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -17,27 +17,17 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Futures;
 
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +35,6 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelId;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
@@ -54,19 +43,13 @@ import org.apache.cassandra.metrics.StreamingMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.async.OutboundConnectionIdentifier;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.async.NettyStreamingMessageSender;
 import org.apache.cassandra.streaming.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.Refs;
 
 /**
- * Handles the streaming a one or more section of one of more sstables to and 
from a specific
- * remote node. The sending side performs a block-level transfer of the source 
sstable, while the receiver
- * must deserilaize that data stream into an partitions and rows, and then 
write that out as an sstable.
+ * Handles the streaming a one or more streams to and from a specific remote 
node.
  *
  * Both this node and the remote one will create a similar symmetrical {@link 
StreamSession}. A streaming
  * session has the following life-cycle:
@@ -98,15 +81,15 @@ import org.apache.cassandra.utils.concurrent.Refs;
  *
  *   (a) The streaming phase is started at each node by calling {@link 
StreamSession#startStreamingFiles(boolean)}.
  *       This will send, sequentially on each outbound streaming connection 
(see {@link NettyStreamingMessageSender}),
- *       an {@link OutgoingFileMessage} for each file in each of the {@link 
StreamTransferTask}.
- *       Each {@link OutgoingFileMessage} consists of a {@link 
FileMessageHeader} that contains metadata about the file
- *       being streamed, followed by the file content itself. Once all the 
files for a {@link StreamTransferTask} are sent,
+ *       an {@link OutgoingStreamMessage} for each stream in each of the 
{@link StreamTransferTask}.
+ *       Each {@link OutgoingStreamMessage} consists of a {@link 
StreamMessageHeader} that contains metadata about
+ *       the stream, followed by the stream content itself. Once all the files 
for a {@link StreamTransferTask} are sent,
  *       the task is marked complete {@link StreamTransferTask#complete(int)}.
- *   (b) On the receiving side, a SSTable will be written for the incoming 
file, and once the file is fully received,
- *       the file will be marked as complete ({@link 
StreamReceiveTask#received(SSTableMultiWriter)}). When all files
- *       for the {@link StreamReceiveTask} have been received, the sstables 
are added to the CFS (and 2ndary indexes/MV are built),
+ *   (b) On the receiving side, the incoming data is written to disk, and once 
the stream is fully received,
+ *       it will be marked as complete ({@link 
StreamReceiveTask#received(IncomingStream)}). When all streams
+ *       for the {@link StreamReceiveTask} have been received, the data is 
added to the CFS (and 2ndary indexes/MV are built),
  *        and the task is marked complete ({@link 
#taskCompleted(StreamReceiveTask)}).
- *   (b) If during the streaming of a particular file an error occurs on the 
receiving end of a stream
+ *   (b) If during the streaming of a particular stream an error occurs on the 
receiving end of a stream
  *       (it may be either the initiator or the follower), the node will send 
a {@link SessionFailedMessage}
  *       to the sender and close the stream session.
  *   (c) When all transfer and receive tasks for a session are complete, the 
session moves to the Completion phase
@@ -127,7 +110,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
  * F: PrepareSynAckMessage
  * I: PrepareAckMessage
  * (stream - this can happen in both directions)
- * I: OutgoingFileMessage
+ * I: OutgoingStreamMessage
  * F: ReceivedMessage
  * (completion)
  * I/F: CompleteMessage
@@ -140,6 +123,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StreamSession.class);
 
+    private final StreamOperation streamOperation;
     /**
      * Streaming endpoint.
      *
@@ -170,7 +154,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     private final ConcurrentMap<ChannelId, Channel> incomingChannels = new 
ConcurrentHashMap<>();
 
     private final AtomicBoolean isAborted = new AtomicBoolean(false);
-    private final boolean keepSSTableLevel;
     private final UUID pendingRepair;
     private final PreviewKind previewKind;
 
@@ -189,11 +172,13 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     /**
      * Create new streaming session with the peer.
-     *  @param peer Address of streaming peer
+     * @param streamOperation
+     * @param peer Address of streaming peer
      * @param connecting Actual connecting address
      */
-    public StreamSession(InetAddressAndPort peer, InetAddressAndPort 
connecting, StreamConnectionFactory factory, int index, boolean 
keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
+    public StreamSession(StreamOperation streamOperation, InetAddressAndPort 
peer, InetAddressAndPort connecting, StreamConnectionFactory factory, int 
index, UUID pendingRepair, PreviewKind previewKind)
     {
+        this.streamOperation = streamOperation;
         this.peer = peer;
         this.connecting = connecting;
         this.index = index;
@@ -202,7 +187,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                                                                               
InetAddressAndPort.getByAddressOverrideDefaults(connecting.address, 
MessagingService.instance().portFor(connecting)));
         this.messageSender = new NettyStreamingMessageSender(this, id, 
factory, StreamMessage.CURRENT_VERSION, previewKind.isPreview());
         this.metrics = StreamingMetrics.get(connecting);
-        this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
         this.previewKind = previewKind;
     }
@@ -222,9 +206,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return streamResult == null ? null : streamResult.streamOperation;
     }
 
-    public boolean keepSSTableLevel()
+    public StreamOperation getStreamOperation()
     {
-        return keepSSTableLevel;
+        return streamOperation;
     }
 
     public UUID getPendingRepair()
@@ -242,10 +226,10 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return previewKind;
     }
 
-    public LifecycleTransaction getTransaction(TableId tableId)
+    public StreamReceiver getAggregator(TableId tableId)
     {
         assert receivers.containsKey(tableId);
-        return receivers.get(tableId).getTransaction();
+        return receivers.get(tableId).getReceiver();
     }
 
     /**
@@ -309,8 +293,6 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     /**
      * Set up transfer for specific keyspace/ranges/CFs
      *
-     * Used in repair - a streamed sstable in repair will be marked with the 
given repairedAt time
-     *
      * @param keyspace Transfer keyspace
      * @param ranges Transfer ranges
      * @param columnFamilies Transfer ColumnFamilies
@@ -324,23 +306,15 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableStreamingSections> sections = 
getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair, 
previewKind);
-        try
-        {
-            addTransferFiles(sections);
-            Set<Range<Token>> toBeUpdated = 
transferredRangesPerKeyspace.get(keyspace);
-            if (toBeUpdated == null)
-            {
-                toBeUpdated = new HashSet<>();
-            }
-            toBeUpdated.addAll(ranges);
-            transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
-        }
-        finally
+        List<OutgoingStream> streams = 
getOutgoingStreamsForRanges(normalizedRanges, stores, pendingRepair, 
previewKind);
+        addTransferStreams(streams);
+        Set<Range<Token>> toBeUpdated = 
transferredRangesPerKeyspace.get(keyspace);
+        if (toBeUpdated == null)
         {
-            for (SSTableStreamingSections release : sections)
-                release.ref.release();
+            toBeUpdated = new HashSet<>();
         }
+        toBeUpdated.addAll(ranges);
+        transferredRangesPerKeyspace.put(keyspace, toBeUpdated);
     }
 
     private void failIfFinished()
@@ -366,83 +340,30 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    public static List<SSTableStreamingSections> 
getSSTableSectionsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind 
previewKind)
+    public List<OutgoingStream> 
getOutgoingStreamsForRanges(Collection<Range<Token>> ranges, 
Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind 
previewKind)
     {
-        Refs<SSTableReader> refs = new Refs<>();
+        List<OutgoingStream> streams = new ArrayList<>();
         try
         {
-            for (ColumnFamilyStore cfStore : stores)
-            {
-                final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(ranges.size());
-                for (Range<Token> range : ranges)
-                    keyRanges.add(Range.makeRowRange(range));
-                refs.addAll(cfStore.selectAndReference(view -> {
-                    Set<SSTableReader> sstables = Sets.newHashSet();
-                    SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
-                    Predicate<SSTableReader> predicate;
-                    if (previewKind.isPreview())
-                    {
-                        predicate = previewKind.getStreamingPredicate();
-                    }
-                    else if (pendingRepair == 
ActiveRepairService.NO_PENDING_REPAIR)
-                    {
-                        predicate = Predicates.alwaysTrue();
-                    }
-                    else
-                    {
-                        predicate = s -> s.isPendingRepair() && 
s.getSSTableMetadata().pendingRepair.equals(pendingRepair);
-                    }
-
-                    for (Range<PartitionPosition> keyRange : keyRanges)
-                    {
-                        // keyRange excludes its start, while sstableInBounds 
is inclusive (of both start and end).
-                        // This is fine however, because keyRange has been 
created from a token range through Range.makeRowRange (see above).
-                        // And that later method uses the Token.maxKeyBound() 
method to creates the range, which return a "fake" key that
-                        // sort after all keys having the token. That "fake" 
key cannot however be equal to any real key, so that even
-                        // including keyRange.left will still exclude any key 
having the token of the original token range, and so we're
-                        // still actually selecting what we wanted.
-                        for (SSTableReader sstable : 
Iterables.filter(View.sstablesInBounds(keyRange.left, keyRange.right, 
intervalTree), predicate))
-                        {
-                            sstables.add(sstable);
-                        }
-                    }
-
-                    if (logger.isDebugEnabled())
-                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), Iterables.size(view.select(SSTableSet.CANONICAL)));
-                    return sstables;
-                }).refs);
-            }
-
-            List<SSTableStreamingSections> sections = new 
ArrayList<>(refs.size());
-            for (SSTableReader sstable : refs)
+            for (ColumnFamilyStore cfs: stores)
             {
-                sections.add(new SSTableStreamingSections(refs.get(sstable), 
sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges)));
+                
streams.addAll(cfs.getStreamManager().createOutgoingStreams(this, ranges, 
pendingRepair, previewKind));
             }
-            return sections;
         }
         catch (Throwable t)
         {
-            refs.release();
+            streams.forEach(OutgoingStream::finish);
             throw t;
         }
+        return streams;
     }
 
-    synchronized void addTransferFiles(Collection<SSTableStreamingSections> 
sstableDetails)
+    synchronized void addTransferStreams(Collection<OutgoingStream> streams)
     {
         failIfFinished();
-        Iterator<SSTableStreamingSections> iter = sstableDetails.iterator();
-        while (iter.hasNext())
+        for (OutgoingStream stream: streams)
         {
-            SSTableStreamingSections details = iter.next();
-            if (details.sections.isEmpty())
-            {
-                // A reference was acquired on the sstable and we won't stream 
it
-                details.ref.release();
-                iter.remove();
-                continue;
-            }
-
-            TableId tableId = details.ref.get().metadata().id;
+            TableId tableId = stream.getTableId();
             StreamTransferTask task = transfers.get(tableId);
             if (task == null)
             {
@@ -452,22 +373,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 if (task == null)
                     task = newTask;
             }
-            task.addTransferFile(details.ref, details.estimatedKeys, 
details.sections);
-            iter.remove();
-        }
-    }
-
-    public static class SSTableStreamingSections
-    {
-        public final Ref<SSTableReader> ref;
-        public final List<Pair<Long, Long>> sections;
-        public final long estimatedKeys;
-
-        public SSTableStreamingSections(Ref<SSTableReader> ref, 
List<Pair<Long, Long>> sections, long estimatedKeys)
-        {
-            this.ref = ref;
-            this.sections = sections;
-            this.estimatedKeys = estimatedKeys;
+            task.addTransferStream(stream);
         }
     }
 
@@ -554,8 +460,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
             case PREPARE_ACK:
                 prepareAck((PrepareAckMessage) message);
                 break;
-            case FILE:
-                receive((IncomingFileMessage) message);
+            case STREAM:
+                receive((IncomingStreamMessage) message);
                 break;
             case RECEIVED:
                 ReceivedMessage received = (ReceivedMessage) message;
@@ -681,41 +587,42 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
     }
 
     /**
-     * Call back after sending FileMessageHeader.
+     * Call back after sending StreamMessageHeader.
      *
-     * @param header sent header
+     * @param message sent stream message
      */
-    public void fileSent(FileMessageHeader header)
+    public void streamSent(OutgoingStreamMessage message)
     {
-        long headerSize = header.size();
+        long headerSize = message.stream.getSize();
         StreamingMetrics.totalOutgoingBytes.inc(headerSize);
         metrics.outgoingBytes.inc(headerSize);
         // schedule timeout for receiving ACK
-        StreamTransferTask task = transfers.get(header.tableId);
+        StreamTransferTask task = transfers.get(message.header.tableId);
         if (task != null)
         {
-            task.scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS);
+            task.scheduleTimeout(message.header.sequenceNumber, 12, 
TimeUnit.HOURS);
         }
     }
 
     /**
-     * Call back after receiving a streamed file.
+     * Call back after receiving a stream.
      *
-     * @param message received file
+     * @param message received stream
      */
-    public void receive(IncomingFileMessage message)
+    public void receive(IncomingStreamMessage message)
     {
         if (isPreview())
         {
             throw new RuntimeException("Cannot receive files for preview 
session");
         }
 
-        long headerSize = message.header.size();
+        long headerSize = message.stream.getSize();
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
         metrics.incomingBytes.inc(headerSize);
         // send back file received message
         messageSender.sendMessage(new ReceivedMessage(message.header.tableId, 
message.header.sequenceNumber));
-        receivers.get(message.header.tableId).received(message.sstable);
+        StreamHook.instance.reportIncomingStream(message.header.tableId, 
message.stream, this, message.header.sequenceNumber);
+        receivers.get(message.header.tableId).received(message.stream);
     }
 
     public void progress(String filename, ProgressInfo.Direction direction, 
long bytes, long total)
@@ -812,8 +719,8 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         }
         finally
         {
-            // aborting the tasks here needs to be the last thing we do so 
that we
-            // accurately report expected streaming, but don't leak any 
sstable refs
+            // aborting the tasks here needs to be the last thing we do so 
that we accurately report
+            // expected streaming, but don't leak any resources held by the 
task
             for (StreamTask task : Iterables.concat(receivers.values(), 
transfers.values()))
                 task.abort();
         }
@@ -872,10 +779,10 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
         for (StreamTransferTask task : transfers.values())
         {
-            Collection<OutgoingFileMessage> messages = task.getFileMessages();
+            Collection<OutgoingStreamMessage> messages = 
task.getFileMessages();
             if (!messages.isEmpty())
             {
-                for (OutgoingFileMessage ofm : messages)
+                for (OutgoingStreamMessage ofm : messages)
                 {
                     // pass the session planId/index to the OFM (which is only 
set at init(), after the transfers have already been created)
                     ofm.header.addSessionInfo(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java 
b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 5e21712..802188a 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -24,20 +24,18 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Ref;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 
 /**
- * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
+ * StreamTransferTask sends streams for a given table
  */
 public class StreamTransferTask extends StreamTask
 {
@@ -48,7 +46,7 @@ public class StreamTransferTask extends StreamTask
     private boolean aborted = false;
 
     @VisibleForTesting
-    protected final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    protected final Map<Integer, OutgoingStreamMessage> streams = new 
HashMap<>();
     private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;
@@ -58,19 +56,19 @@ public class StreamTransferTask extends StreamTask
         super(session, tableId);
     }
 
-    public synchronized void addTransferFile(Ref<SSTableReader> ref, long 
estimatedKeys, List<Pair<Long, Long>> sections)
+    public synchronized void addTransferStream(OutgoingStream stream)
     {
-        assert ref.get() != null && tableId.equals(ref.get().metadata().id);
-        OutgoingFileMessage message = new OutgoingFileMessage(ref, session, 
sequenceNumber.getAndIncrement(), estimatedKeys, sections, 
session.keepSSTableLevel());
-        message = StreamHook.instance.reportOutgoingFile(session, ref.get(), 
message);
-        files.put(message.header.sequenceNumber, message);
-                totalSize += message.header.size();
+        Preconditions.checkArgument(tableId.equals(stream.getTableId()));
+        OutgoingStreamMessage message = new OutgoingStreamMessage(tableId, 
session, stream, sequenceNumber.getAndIncrement());
+        message = StreamHook.instance.reportOutgoingStream(session, stream, 
message);
+        streams.put(message.header.sequenceNumber, message);
+        totalSize += message.stream.getSize();
     }
 
     /**
-     * Received ACK for file at {@code sequenceNumber}.
+     * Received ACK for stream at {@code sequenceNumber}.
      *
-     * @param sequenceNumber sequence number of file
+     * @param sequenceNumber sequence number of stream
      */
     public void complete(int sequenceNumber)
     {
@@ -81,12 +79,12 @@ public class StreamTransferTask extends StreamTask
             if (timeout != null)
                 timeout.cancel(false);
 
-            OutgoingFileMessage file = files.remove(sequenceNumber);
-            if (file != null)
-                file.complete();
+            OutgoingStreamMessage stream = streams.remove(sequenceNumber);
+            if (stream != null)
+                stream.complete();
 
-            logger.debug("recevied sequenceNumber {}, remaining files {}", 
sequenceNumber, files.keySet());
-            signalComplete = files.isEmpty();
+            logger.debug("recevied sequenceNumber {}, remaining files {}", 
sequenceNumber, streams.keySet());
+            signalComplete = streams.isEmpty();
         }
 
         // all file sent, notify session this task is complete.
@@ -105,11 +103,11 @@ public class StreamTransferTask extends StreamTask
         timeoutTasks.clear();
 
         Throwable fail = null;
-        for (OutgoingFileMessage file : files.values())
+        for (OutgoingStreamMessage stream : streams.values())
         {
             try
             {
-                file.complete();
+                stream.complete();
             }
             catch (Throwable t)
             {
@@ -117,14 +115,14 @@ public class StreamTransferTask extends StreamTask
                 else fail.addSuppressed(t);
             }
         }
-        files.clear();
+        streams.clear();
         if (fail != null)
             Throwables.propagate(fail);
     }
 
     public synchronized int getTotalNumberOfFiles()
     {
-        return files.size();
+        return streams.size();
     }
 
     public long getTotalSize()
@@ -132,35 +130,35 @@ public class StreamTransferTask extends StreamTask
         return totalSize;
     }
 
-    public synchronized Collection<OutgoingFileMessage> getFileMessages()
+    public synchronized Collection<OutgoingStreamMessage> getFileMessages()
     {
         // We may race between queuing all those messages and the completion 
of the completion of
         // the first ones. So copy the values to avoid a 
ConcurrentModificationException
-        return new ArrayList<>(files.values());
+        return new ArrayList<>(streams.values());
     }
 
-    public synchronized OutgoingFileMessage createMessageForRetry(int 
sequenceNumber)
+    public synchronized OutgoingStreamMessage createMessageForRetry(int 
sequenceNumber)
     {
         // remove previous time out task to be rescheduled later
         ScheduledFuture future = timeoutTasks.remove(sequenceNumber);
         if (future != null)
             future.cancel(false);
-        return files.get(sequenceNumber);
+        return streams.get(sequenceNumber);
     }
 
     /**
-     * Schedule timeout task to release reference for file sent.
+     * Schedule timeout task to release reference for stream sent.
      * When not receiving ACK after sending to receiver in given time,
      * the task will release reference.
      *
-     * @param sequenceNumber sequence number of file sent.
+     * @param sequenceNumber sequence number of stream sent.
      * @param time time to timeout
      * @param unit unit of given time
      * @return scheduled future for timeout task
      */
     public synchronized ScheduledFuture scheduleTimeout(final int 
sequenceNumber, long time, TimeUnit unit)
     {
-        if (!files.containsKey(sequenceNumber))
+        if (!streams.containsKey(sequenceNumber))
             return null;
 
         ScheduledFuture future = timeoutExecutor.schedule(new Runnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java 
b/src/java/org/apache/cassandra/streaming/StreamWriter.java
deleted file mode 100644
index 81b3d8a..0000000
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.DataIntegrityMetadata.ChecksumValidator;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
-import 
org.apache.cassandra.streaming.compress.ByteBufCompressionDataOutputStreamPlus;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * StreamWriter writes given section of the SSTable to given channel.
- */
-public class StreamWriter
-{
-    private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamWriter.class);
-
-    protected final SSTableReader sstable;
-    protected final Collection<Pair<Long, Long>> sections;
-    protected final StreamRateLimiter limiter;
-    protected final StreamSession session;
-
-    public StreamWriter(SSTableReader sstable, Collection<Pair<Long, Long>> 
sections, StreamSession session)
-    {
-        this.session = session;
-        this.sstable = sstable;
-        this.sections = sections;
-        this.limiter =  StreamManager.getRateLimiter(session.peer);
-    }
-
-    /**
-     * Stream file of specified sections to given channel.
-     *
-     * StreamWriter uses LZF compression on wire to decrease size to transfer.
-     *
-     * @param output where this writes data to
-     * @throws IOException on any I/O error
-     */
-    public void write(DataOutputStreamPlus output) throws IOException
-    {
-        long totalSize = totalSize();
-        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
-                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
-
-        try(ChannelProxy proxy = sstable.getDataChannel().sharedCopy();
-            ChecksumValidator validator = new 
File(sstable.descriptor.filenameFor(Component.CRC)).exists()
-                                          ? 
DataIntegrityMetadata.checksumValidator(sstable.descriptor)
-                                          : null)
-        {
-            int bufferSize = validator == null ? DEFAULT_CHUNK_SIZE: 
validator.chunkSize;
-
-            // setting up data compression stream
-            long progress = 0L;
-
-            try (DataOutputStreamPlus compressedOutput = new 
ByteBufCompressionDataOutputStreamPlus(output, limiter))
-            {
-                // stream each of the required sections of the file
-                for (Pair<Long, Long> section : sections)
-                {
-                    long start = validator == null ? section.left : 
validator.chunkStart(section.left);
-                    // if the transfer does not start on the valididator's 
chunk boundary, this is the number of bytes to offset by
-                    int transferOffset = (int) (section.left - start);
-                    if (validator != null)
-                        validator.seek(start);
-
-                    // length of the section to read
-                    long length = section.right - start;
-                    // tracks write progress
-                    long bytesRead = 0;
-                    while (bytesRead < length)
-                    {
-                        int toTransfer = (int) Math.min(bufferSize, length - 
bytesRead);
-                        long lastBytesRead = write(proxy, validator, 
compressedOutput, start, transferOffset, toTransfer, bufferSize);
-                        start += lastBytesRead;
-                        bytesRead += lastBytesRead;
-                        progress += (lastBytesRead - transferOffset);
-                        
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
-                        transferOffset = 0;
-                    }
-
-                    // make sure that current section is sent
-                    output.flush();
-                }
-                logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
-                             session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
-            }
-        }
-    }
-
-    protected long totalSize()
-    {
-        long size = 0;
-        for (Pair<Long, Long> section : sections)
-            size += section.right - section.left;
-        return size;
-    }
-
-    /**
-     * Sequentially read bytes from the file and write them to the output 
stream
-     *
-     * @param proxy The file reader to read from
-     * @param validator validator to verify data integrity
-     * @param start The readd offset from the beginning of the {@code proxy} 
file.
-     * @param transferOffset number of bytes to skip transfer, but include for 
validation.
-     * @param toTransfer The number of bytes to be transferred.
-     *
-     * @return Number of bytes transferred.
-     *
-     * @throws java.io.IOException on any I/O error
-     */
-    protected long write(ChannelProxy proxy, ChecksumValidator validator, 
DataOutputStreamPlus output, long start, int transferOffset, int toTransfer, 
int bufferSize) throws IOException
-    {
-        // the count of bytes to read off disk
-        int minReadable = (int) Math.min(bufferSize, proxy.size() - start);
-
-        // this buffer will hold the data from disk. as it will be compressed 
on the fly by
-        // ByteBufCompressionDataOutputStreamPlus.write(ByteBuffer), we can 
release this buffer as soon as we can.
-        ByteBuffer buffer = ByteBuffer.allocateDirect(minReadable);
-        try
-        {
-            int readCount = proxy.read(buffer, start);
-            assert readCount == minReadable : String.format("could not read 
required number of bytes from file to be streamed: read %d bytes, wanted %d 
bytes", readCount, minReadable);
-            buffer.flip();
-
-            if (validator != null)
-            {
-                validator.validate(buffer);
-                buffer.flip();
-            }
-
-            buffer.position(transferOffset);
-            buffer.limit(transferOffset + (toTransfer - transferOffset));
-            output.write(buffer);
-        }
-        finally
-        {
-            FileUtils.clean(buffer);
-        }
-
-        return toTransfer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/TableStreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/TableStreamManager.java 
b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
new file mode 100644
index 0000000..11512e9
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/TableStreamManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+/**
+ * The main streaming hook for a storage implementation.
+ *
+ * From here, the streaming system can get instances of {@link 
StreamReceiver}, {@link IncomingStream},
+ * and {@link OutgoingStream}, which expose the interfaces into the the 
underlying storage implementation
+ * needed to make streaming work.
+ */
+public interface TableStreamManager
+{
+    /**
+     * Creates a {@link StreamReceiver} for the given session, expecting the 
given number of streams
+     */
+    StreamReceiver createStreamReceiver(StreamSession session, int 
totalStreams);
+
+    /**
+     * Creates an {@link IncomingStream} for the given header
+     */
+    IncomingStream prepareIncomingStream(StreamSession session, 
StreamMessageHeader header);
+
+    /**
+     * Returns a collection of {@link OutgoingStream}s that contains the data 
selected by the
+     * given ranges, pendingRepair, and preview.
+     *
+     * There aren't any requirements on how data is divided between the 
outgoing streams
+     */
+    Collection<OutgoingStream> createOutgoingStreams(StreamSession session,
+                                                     Collection<Range<Token>> 
ranges,
+                                                     UUID pendingRepair,
+                                                     PreviewKind previewKind);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 20b7c87..bbc451d 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -55,28 +55,28 @@ import 
org.apache.cassandra.net.async.OutboundConnectionIdentifier;
 import org.apache.cassandra.streaming.StreamConnectionFactory;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamingMessageSender;
-import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 import org.apache.cassandra.streaming.messages.KeepAliveMessage;
-import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
+import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Responsible for sending {@link StreamMessage}s to a given peer. We manage 
an array of netty {@link Channel}s
- * for sending {@link OutgoingFileMessage} instances; all other {@link 
StreamMessage} types are sent via
+ * for sending {@link OutgoingStreamMessage} instances; all other {@link 
StreamMessage} types are sent via
  * a special control channel. The reason for this is to treat those messages 
carefully and not let them get stuck
- * behind a file transfer.
+ * behind a stream transfer.
  *
- * One of the challenges when sending files is we might need to delay shipping 
the file if:
+ * One of the challenges when sending streams is we might need to delay 
shipping the stream if:
  *
  * - we've exceeded our network I/O use due to rate limiting (at the cassandra 
level)
  * - the receiver isn't keeping up, which causes the local TCP socket buffer 
to not empty, which causes epoll writes to not
  * move any bytes to the socket, which causes buffers to stick around in 
user-land (a/k/a cassandra) memory.
  *
- * When those conditions occur, it's easy enough to reschedule processing the 
file once the resources pick up
+ * When those conditions occur, it's easy enough to reschedule processing the 
stream once the resources pick up
  * (we acquire the permits from the rate limiter, or the socket drains). 
However, we need to ensure that
- * no other messages are submitted to the same channel while the current file 
is still being processed.
+ * no other messages are submitted to the same channel while the current 
stream is still being processed.
  */
 public class NettyStreamingMessageSender implements StreamingMessageSender
 {
@@ -97,8 +97,8 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
     private volatile boolean closed;
 
     /**
-     * A special {@link Channel} for sending non-file streaming messages, 
basically anything that isn't an
-     * {@link OutgoingFileMessage} (or an {@link IncomingFileMessage}, but a 
node doesn't send that, it's only received).
+     * A special {@link Channel} for sending non-stream streaming messages, 
basically anything that isn't an
+     * {@link OutgoingStreamMessage} (or an {@link IncomingStreamMessage}, but 
a node doesn't send that, it's only received).
      */
     private Channel controlMessageChannel;
 
@@ -113,9 +113,9 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
     private final ConcurrentMap<Thread, Channel> threadToChannelMap = new 
ConcurrentHashMap<>();
 
     /**
-     * A netty channel attribute used to indicate if a channel is currently 
transferring a file. This is primarily used
+     * A netty channel attribute used to indicate if a channel is currently 
transferring a stream. This is primarily used
      * to indicate to the {@link KeepAliveTask} if it is safe to send a {@link 
KeepAliveMessage}, as sending the
-     * (application level) keep-alive in the middle of streaming a file would 
be bad news.
+     * (application level) keep-alive in the middle of a stream would be bad 
news.
      */
     @VisibleForTesting
     static final AttributeKey<Boolean> TRANSFERRING_FILE_ATTR = 
AttributeKey.valueOf("transferringFile");
@@ -141,7 +141,6 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
                                                           
session.sessionIndex(),
                                                           session.planId(),
                                                           
session.streamOperation(),
-                                                          
session.keepSSTableLevel(),
                                                           
session.getPendingRepair(),
                                                           
session.getPreviewKind());
         sendMessage(message);
@@ -209,12 +208,12 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         if (closed)
             throw new RuntimeException("stream has been closed, cannot send " 
+ message);
 
-        if (message instanceof OutgoingFileMessage)
+        if (message instanceof OutgoingStreamMessage)
         {
             if (isPreview)
-                throw new RuntimeException("Cannot send file messages for 
preview streaming sessions");
+                throw new RuntimeException("Cannot send stream data messages 
for preview streaming sessions");
             logger.debug("{} Sending {}", createLogTag(session, null), 
message);
-            fileTransferExecutor.submit(new 
FileStreamTask((OutgoingFileMessage)message));
+            fileTransferExecutor.submit(new 
FileStreamTask((OutgoingStreamMessage)message));
             return;
         }
 
@@ -271,7 +270,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
             return null;
 
         Channel channel = channelFuture.channel();
-        logger.error("{} failed to send a stream message/file to peer {}: msg 
= {}",
+        logger.error("{} failed to send a stream message/data to peer {}: msg 
= {}",
                      createLogTag(session, channel), connectionId, msg, 
future.cause());
 
         // StreamSession will invoke close(), but we have to mark this sender 
as closed so the session doesn't try
@@ -288,12 +287,12 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
 
         /**
-         * Even though we expect only an {@link OutgoingFileMessage} at 
runtime, the type here is {@link StreamMessage}
+         * Even though we expect only an {@link OutgoingStreamMessage} at 
runtime, the type here is {@link StreamMessage}
          * to facilitate simpler testing.
          */
         private final StreamMessage msg;
 
-        FileStreamTask(OutgoingFileMessage ofm)
+        FileStreamTask(OutgoingStreamMessage ofm)
         {
             this.msg = ofm;
         }
@@ -357,9 +356,9 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
                     if (now - timeOfLastLogging > logIntervalNanos)
                     {
                         timeOfLastLogging = now;
-                        OutgoingFileMessage ofm = (OutgoingFileMessage)msg;
-                        logger.info("{} waiting to acquire a permit to begin 
streaming file {}. This message logs every {} minutes",
-                                    createLogTag(session, null), 
ofm.getFilename(), logInterval);
+                        OutgoingStreamMessage ofm = (OutgoingStreamMessage)msg;
+                        logger.info("{} waiting to acquire a permit to begin 
streaming {}. This message logs every {} minutes",
+                                    createLogTag(session, null), 
ofm.getName(), logInterval);
                     }
                 }
                 catch (InterruptedException ie)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java 
b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index 907572b..03f0640 100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.streaming.async;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -43,8 +42,8 @@ import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamReceiveException;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.streaming.messages.IncomingFileMessage;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 import org.apache.cassandra.streaming.messages.StreamInitMessage;
 import org.apache.cassandra.streaming.messages.StreamMessage;
@@ -53,8 +52,8 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import static 
org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
 
 /**
- * Handles the inbound side of streaming messages and sstable data. From the 
incoming data, we derserialize the message
- * and potentially reify partitions and rows and write those out to new 
sstable files. Because deserialization is a blocking affair,
+ * Handles the inbound side of streaming messages and stream data. From the 
incoming data, we derserialize the message
+ * including the actual stream data itself. Because the reading and 
deserialization of streams is a blocking affair,
  * we can't block the netty event loop. Thus we have a background thread 
perform all the blocking deserialization.
  */
 public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
@@ -128,7 +127,7 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
         if (cause instanceof IOException)
             logger.trace("connection problem while streaming", cause);
         else
-            logger.warn("exception occurred while in processing streaming 
file", cause);
+            logger.warn("exception occurred while in processing streaming 
data", cause);
         close();
     }
 
@@ -224,20 +223,20 @@ public class StreamingInboundHandler extends 
ChannelInboundHandlerAdapter
         StreamSession deriveSession(StreamMessage message) throws IOException
         {
             StreamSession streamSession = null;
-            // StreamInitMessage starts a new channel, and IncomingFileMessage 
potentially, as well.
-            // IncomingFileMessage needs a session to be established a priori, 
though
+            // StreamInitMessage starts a new channel, and 
IncomingStreamMessage potentially, as well.
+            // IncomingStreamMessage needs a session to be established a 
priori, though
             if (message instanceof StreamInitMessage)
             {
                 assert session == null : "initiator of stream session received 
a StreamInitMessage";
                 StreamInitMessage init = (StreamInitMessage) message;
-                StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, channel, init.keepSSTableLevel, 
init.pendingRepair, init.previewKind);
+                StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.streamOperation, init.from, channel, init.pendingRepair, 
init.previewKind);
                 streamSession = sessionProvider.apply(new 
SessionIdentifier(init.from, init.planId, init.sessionIndex));
             }
-            else if (message instanceof IncomingFileMessage)
+            else if (message instanceof IncomingStreamMessage)
             {
-                // TODO: it'd be great to check if the session actually exists 
before slurping in the entire sstable,
+                // TODO: it'd be great to check if the session actually exists 
before slurping in the entire stream,
                 // but that's a refactoring for another day
-                FileMessageHeader header = ((IncomingFileMessage) 
message).header;
+                StreamMessageHeader header = ((IncomingStreamMessage) 
message).header;
                 streamSession = sessionProvider.apply(new 
SessionIdentifier(header.sender, header.planId, header.sessionIndex));
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/async/package-info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/async/package-info.java 
b/src/java/org/apache/cassandra/streaming/async/package-info.java
index ecf5115..9455c7c 100644
--- a/src/java/org/apache/cassandra/streaming/async/package-info.java
+++ b/src/java/org/apache/cassandra/streaming/async/package-info.java
@@ -18,14 +18,14 @@
 
 /**
  * <h1>Non-blocking streaming with netty</h1>
- * This document describes the implementation details of streaming protocol. A 
listener for a streaming
+ * This document describes the implementation details of the streaming 
protocol. A listener for a streaming
  * session listens on the same socket as internode messaging, and participates 
in the same handshake protocol
  * That protocol is described in the package-level documentation for {@link 
org.apache.cassandra.net.async}, and
  * thus not here.
  *
  * Streaming 2.0 was implemented as CASSANDRA-5286. Streaming 2.0 used (the 
equivalent of) a single thread and
  * a single socket to transfer sstables sequentially to a peer (either as part 
of a repair, bootstrap, and so on).
- * Part of the motivation for switching to netty and a non-blocking model as 
to enable file transfers to occur
+ * Part of the motivation for switching to netty and a non-blocking model as 
to enable stream transfers to occur
  * in parallel for a given session.
  *
  * Thus, a more detailed approach is required for stream session management.
@@ -34,38 +34,6 @@
  *
  * The full details of the session lifecycle are documented in {@link 
org.apache.cassandra.streaming.StreamSession}.
  *
- *
- * <h2>File transfer</h2>
- *
- * When tranferring whole or subsections of an sstable, only the DATA 
component is shipped. To that end,
- * there are three "modes" of an sstable transfer that need to be handled 
somewhat differently:
- *
- * 1) uncompressed sstable - data needs to be read into user space so it can 
be manipulated: checksum validation,
- * apply stream compression (see next section), and/or TLS encryption.
- *
- * 2) compressed sstable, transferred with SSL/TLS - data needs to be read 
into user space as that is where the TLS encryption
- * needs to happen. Netty does not allow the pretense of doing zero-copy 
transfers when TLS is in the pipeline;
- * data must explicitly be pulled into user-space memory for TLS encryption to 
work.
- *
- * 3) compressed sstable, transferred without SSL/TLS - data can be streamed 
via zero-copy transfer as the data does not
- * need to be manipulated (it can be sent "as-is").
- *
- * <h3>Compressing the data</h3>
- * We always want to transfer as few bytes as possible of the wire when 
streaming a file. If the
- * sstable is not already compressed via table compression options, we apply 
an on-the-fly stream compression
- * to the data. The stream compression format is documented in
- * {@link org.apache.cassandra.streaming.async.StreamCompressionSerializer}
- *
- * You may be wondering: why implement your own compression scheme? why not 
use netty's built-in compression codecs,
- * like {@link io.netty.handler.codec.compression.Lz4FrameEncoder}? That makes 
complete sense if all the sstables
- * to be streamed are non using sstable compression (and obviously you 
wouldn't use stream compression when the sstables
- * are using sstable compression). The problem is when you have a mix of 
files, some using sstable compression
- * and some not. You can either:
- *
- * - send the files of one type over one kind of socket, and the others over 
another socket
- * - send them both over the same socket, but then auto-adjust per each file 
type.
- *
- * I've opted for the latter to keep socket/channel management simpler and 
cleaner.
  */
 package org.apache.cassandra.streaming.async;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
deleted file mode 100644
index 290dd9e..0000000
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Iterator;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.DoubleSupplier;
-
-import com.google.common.collect.Iterators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.util.concurrent.FastThreadLocalThread;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RebufferingInputStream;
-import org.apache.cassandra.streaming.StreamReader.StreamDeserializer;
-import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.WrappedRunnable;
-
-/**
- * InputStream which reads data from underlining source with given {@link 
CompressionInfo}. Uses {@link #buffer} as a buffer
- * for uncompressed data (which is read by stream consumers - {@link 
StreamDeserializer} in this case).
- */
-public class CompressedInputStream extends RebufferingInputStream implements 
AutoCloseable
-{
-
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedInputStream.class);
-
-    private final CompressionInfo info;
-    // chunk buffer
-    private final BlockingQueue<ByteBuffer> dataBuffer;
-    private final DoubleSupplier crcCheckChanceSupplier;
-
-    /**
-     * The base offset of the current {@link #buffer} from the beginning of 
the stream.
-     */
-    private long bufferOffset = 0;
-
-    /**
-     * The current {@link CompressedStreamReader#sections} offset in the 
stream.
-     */
-    private long current = 0;
-
-    private final ChecksumType checksumType;
-
-    private static final int CHECKSUM_LENGTH = 4;
-
-    /**
-     * Indicates there was a problem when reading from source stream.
-     * When this is added to the <code>dataBuffer</code> by the stream Reader,
-     * it is expected that the <code>readException</code> variable is populated
-     * with the cause of the error when reading from source stream, so it is
-     * thrown to the consumer on subsequent read operation.
-     */
-    private static final ByteBuffer POISON_PILL = ByteBuffer.wrap(new byte[0]);
-
-    private volatile IOException readException = null;
-
-    private long totalCompressedBytesRead;
-
-    /**
-     * @param source Input source to read compressed data from
-     * @param info Compression info
-     */
-    public CompressedInputStream(DataInputPlus source, CompressionInfo info, 
ChecksumType checksumType, DoubleSupplier crcCheckChanceSupplier)
-    {
-        super(ByteBuffer.allocateDirect(info.parameters.chunkLength()));
-        buffer.limit(buffer.position()); // force the buffer to appear 
"consumed" so that it triggers reBuffer on the first read
-        this.info = info;
-        this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
-        this.crcCheckChanceSupplier = crcCheckChanceSupplier;
-        this.checksumType = checksumType;
-
-        new FastThreadLocalThread(new Reader(source, info, 
dataBuffer)).start();
-    }
-
-    /**
-     * Invoked when crossing into the next stream boundary in {@link 
CompressedStreamReader#sections}.
-     */
-    public void position(long position) throws IOException
-    {
-        if (readException != null)
-            throw readException;
-
-        assert position >= current : "stream can only read forward.";
-        current = position;
-
-        if (current > bufferOffset + buffer.limit())
-            reBuffer(false);
-
-        buffer.position((int)(current - bufferOffset));
-    }
-
-    protected void reBuffer() throws IOException
-    {
-        reBuffer(true);
-    }
-
-    private void reBuffer(boolean updateCurrent) throws IOException
-    {
-        if (readException != null)
-        {
-            FileUtils.clean(buffer);
-            buffer = null;
-            throw readException;
-        }
-
-        // increment the offset into the stream based on the current buffer's 
read count
-        if (updateCurrent)
-            current += buffer.position();
-
-        try
-        {
-            ByteBuffer compressedWithCRC = dataBuffer.take();
-            if (compressedWithCRC == POISON_PILL)
-            {
-                assert readException != null;
-                throw readException;
-            }
-
-            decompress(compressedWithCRC);
-        }
-        catch (InterruptedException e)
-        {
-            throw new EOFException("No chunk available");
-        }
-    }
-
-    private void decompress(ByteBuffer compressed) throws IOException
-    {
-        int length = compressed.remaining();
-
-        // uncompress if the buffer size is less than the max chunk size. 
else, if the buffer size is greater than or equal to the maxCompressedLength,
-        // we assume the buffer is not compressed. see CASSANDRA-10520
-        final boolean releaseCompressedBuffer;
-        if (length - CHECKSUM_LENGTH < info.parameters.maxCompressedLength())
-        {
-            buffer.clear();
-            compressed.limit(length - CHECKSUM_LENGTH);
-            info.parameters.getSstableCompressor().uncompress(compressed, 
buffer);
-            buffer.flip();
-            releaseCompressedBuffer = true;
-        }
-        else
-        {
-            FileUtils.clean(buffer);
-            buffer = compressed;
-            buffer.limit(length - CHECKSUM_LENGTH);
-            releaseCompressedBuffer = false;
-        }
-        totalCompressedBytesRead += length;
-
-        // validate crc randomly
-        double crcCheckChance = this.crcCheckChanceSupplier.getAsDouble();
-        if (crcCheckChance >= 1d ||
-            (crcCheckChance > 0d && crcCheckChance > 
ThreadLocalRandom.current().nextDouble()))
-        {
-            ByteBuffer crcBuf = compressed.duplicate();
-            crcBuf.limit(length - CHECKSUM_LENGTH).position(0);
-            int checksum = (int) checksumType.of(crcBuf);
-
-            crcBuf.limit(length);
-            if (crcBuf.getInt() != checksum)
-                throw new IOException("CRC unmatched");
-        }
-
-        if (releaseCompressedBuffer)
-            FileUtils.clean(compressed);
-
-        // buffer offset is always aligned
-        final int compressedChunkLength = info.parameters.chunkLength();
-        bufferOffset = current & ~(compressedChunkLength - 1);
-    }
-
-    public long getTotalCompressedBytesRead()
-    {
-        return totalCompressedBytesRead;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * Releases the resources specific to this instance, but not the {@link 
DataInputPlus} that is used by the {@link Reader}.
-     */
-    @Override
-    public void close()
-    {
-        if (buffer != null)
-        {
-            FileUtils.clean(buffer);
-            buffer = null;
-        }
-    }
-
-    class Reader extends WrappedRunnable
-    {
-        private final DataInputPlus source;
-        private final Iterator<CompressionMetadata.Chunk> chunks;
-        private final BlockingQueue<ByteBuffer> dataBuffer;
-
-        Reader(DataInputPlus source, CompressionInfo info, 
BlockingQueue<ByteBuffer> dataBuffer)
-        {
-            this.source = source;
-            this.chunks = Iterators.forArray(info.chunks);
-            this.dataBuffer = dataBuffer;
-        }
-
-        protected void runMayThrow() throws Exception
-        {
-            byte[] tmp = null;
-            while (chunks.hasNext())
-            {
-                CompressionMetadata.Chunk chunk = chunks.next();
-
-                int readLength = chunk.length + 4; // read with CRC
-                ByteBuffer compressedWithCRC = null;
-                try
-                {
-                    final int r;
-                    if (source instanceof ReadableByteChannel)
-                    {
-                        compressedWithCRC = 
ByteBuffer.allocateDirect(readLength);
-                        r = 
((ReadableByteChannel)source).read(compressedWithCRC);
-                        compressedWithCRC.flip();
-                    }
-                    else
-                    {
-                        // read into an on-heap araay, then copy over to an 
off-heap buffer. at a minumum snappy requires
-                        // off-heap buffers for decompression, else we could 
have just wrapped the plain byte array in a ByteBuffer
-                        if (tmp == null || tmp.length < 
info.parameters.chunkLength() + CHECKSUM_LENGTH)
-                            tmp = new byte[info.parameters.chunkLength() + 
CHECKSUM_LENGTH];
-                        source.readFully(tmp, 0, readLength);
-                        compressedWithCRC = 
ByteBuffer.allocateDirect(readLength);
-                        compressedWithCRC.put(tmp, 0, readLength);
-                        compressedWithCRC.position(0);
-                        r = readLength;
-                    }
-
-                    if (r < 0)
-                    {
-                        FileUtils.clean(compressedWithCRC);
-                        readException = new EOFException("No chunk available");
-                        dataBuffer.put(POISON_PILL);
-                        return; // throw exception where we consume dataBuffer
-                    }
-                }
-                catch (IOException e)
-                {
-                    if (!(e instanceof EOFException))
-                        logger.warn("Error while reading compressed input 
stream.", e);
-                    if (compressedWithCRC != null)
-                        FileUtils.clean(compressedWithCRC);
-
-                    readException = e;
-                    dataBuffer.put(POISON_PILL);
-                    return; // throw exception where we consume dataBuffer
-                }
-                dataBuffer.put(compressedWithCRC);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
deleted file mode 100644
index bd44209..0000000
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.IOException;
-
-import com.google.common.base.Throwables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.TrackedDataInputPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamReader;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.messages.FileMessageHeader;
-import org.apache.cassandra.utils.ChecksumType;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
-
-/**
- * StreamReader that reads from streamed compressed SSTable
- */
-public class CompressedStreamReader extends StreamReader
-{
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedStreamReader.class);
-
-    protected final CompressionInfo compressionInfo;
-
-    public CompressedStreamReader(FileMessageHeader header, StreamSession 
session)
-    {
-        super(header, session);
-        this.compressionInfo = header.compressionInfo;
-    }
-
-    /**
-     * @return SSTable transferred
-     * @throws java.io.IOException if reading the remote sstable fails. Will 
throw an RTE if local write fails.
-     */
-    @Override
-    @SuppressWarnings("resource") // input needs to remain open, streams on 
top of it can't be closed
-    public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException
-    {
-        long totalSize = totalSize();
-
-        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
-
-        if (cfs == null)
-        {
-            // schema was dropped during streaming
-            throw new IOException("CF " + tableId + " was dropped during 
streaming");
-        }
-
-        logger.debug("[Stream #{}] Start receiving file #{} from {}, 
repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.",
-                     session.planId(), fileSeqNum, session.peer, repairedAt, 
totalSize, cfs.keyspace.getName(), pendingRepair,
-                     cfs.getTableName());
-
-        StreamDeserializer deserializer = null;
-        SSTableMultiWriter writer = null;
-        try (CompressedInputStream cis = new CompressedInputStream(inputPlus, 
compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance))
-        {
-            TrackedDataInputPlus in = new TrackedDataInputPlus(cis);
-            deserializer = new StreamDeserializer(cfs.metadata(), in, 
inputVersion, getHeader(cfs.metadata()));
-            writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, 
format);
-            String filename = writer.getFilename();
-            int sectionIdx = 0;
-            for (Pair<Long, Long> section : sections)
-            {
-                assert cis.getTotalCompressedBytesRead() <= totalSize;
-                long sectionLength = section.right - section.left;
-
-                logger.trace("[Stream #{}] Reading section {} with length {} 
from stream.", session.planId(), sectionIdx++, sectionLength);
-                // skip to beginning of section inside chunk
-                cis.position(section.left);
-                in.reset(0);
-
-                while (in.getBytesRead() < sectionLength)
-                {
-                    writePartition(deserializer, writer);
-                    // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(filename, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
-                         session.peer, 
FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), 
FBUtilities.prettyPrintMemory(totalSize));
-            return writer;
-        }
-        catch (Throwable e)
-        {
-            Object partitionKey = deserializer != null ? 
deserializer.partitionKey() : "";
-            logger.warn("[Stream {}] Error while reading partition {} from 
stream on ks='{}' and table='{}'.",
-                        session.planId(), partitionKey, 
cfs.keyspace.getName(), cfs.getTableName());
-            if (writer != null)
-            {
-                writer.abort(e);
-            }
-            if (extractIOExceptionCause(e).isPresent())
-                throw e;
-            throw Throwables.propagate(e);
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
deleted file mode 100644
index 0e78b7d..0000000
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.io.sstable.Component;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.DataOutputStreamPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
-import org.apache.cassandra.streaming.ProgressInfo;
-import org.apache.cassandra.streaming.StreamSession;
-import org.apache.cassandra.streaming.StreamWriter;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * StreamWriter for compressed SSTable.
- */
-public class CompressedStreamWriter extends StreamWriter
-{
-    private static final int CHUNK_SIZE = 1 << 16;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(CompressedStreamWriter.class);
-
-    private final CompressionInfo compressionInfo;
-
-    public CompressedStreamWriter(SSTableReader sstable, Collection<Pair<Long, 
Long>> sections, CompressionInfo compressionInfo, StreamSession session)
-    {
-        super(sstable, sections, session);
-        this.compressionInfo = compressionInfo;
-    }
-
-    @Override
-    public void write(DataOutputStreamPlus out) throws IOException
-    {
-        assert out instanceof ByteBufDataOutputStreamPlus;
-        ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out;
-        long totalSize = totalSize();
-        logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = 
{}, totalSize = {}", session.planId(),
-                     sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
-        try (ChannelProxy fc = sstable.getDataChannel().sharedCopy())
-        {
-            long progress = 0L;
-            // calculate chunks to transfer. we want to send continuous chunks 
altogether.
-            List<Pair<Long, Long>> sections = 
getTransferSections(compressionInfo.chunks);
-
-            int sectionIdx = 0;
-
-            // stream each of the required sections of the file
-            for (final Pair<Long, Long> section : sections)
-            {
-                // length of the section to stream
-                long length = section.right - section.left;
-
-                logger.trace("[Stream #{}] Writing section {} with length {} 
to stream.", session.planId(), sectionIdx++, length);
-
-                // tracks write progress
-                long bytesTransferred = 0;
-                while (bytesTransferred < length)
-                {
-                    final int toTransfer = (int) Math.min(CHUNK_SIZE, length - 
bytesTransferred);
-                    limiter.acquire(toTransfer);
-
-                    ByteBuffer outBuffer = 
ByteBuffer.allocateDirect(toTransfer);
-                    long lastWrite;
-                    try
-                    {
-                        lastWrite = fc.read(outBuffer, section.left + 
bytesTransferred);
-                        assert lastWrite == toTransfer : String.format("could 
not read required number of bytes from file to be streamed: read %d bytes, 
wanted %d bytes", lastWrite, toTransfer);
-                        outBuffer.flip();
-                        output.writeToChannel(outBuffer);
-                    }
-                    catch (IOException e)
-                    {
-                        FileUtils.clean(outBuffer);
-                        throw e;
-                    }
-
-                    bytesTransferred += lastWrite;
-                    progress += lastWrite;
-                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
-                }
-            }
-            logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
-                         session.planId(), sstable.getFilename(), 
session.peer, FBUtilities.prettyPrintMemory(progress), 
FBUtilities.prettyPrintMemory(totalSize));
-        }
-    }
-
-    @Override
-    protected long totalSize()
-    {
-        long size = 0;
-        // calculate total length of transferring chunks
-        for (CompressionMetadata.Chunk chunk : compressionInfo.chunks)
-            size += chunk.length + 4; // 4 bytes for CRC
-        return size;
-    }
-
-    // chunks are assumed to be sorted by offset
-    private List<Pair<Long, Long>> 
getTransferSections(CompressionMetadata.Chunk[] chunks)
-    {
-        List<Pair<Long, Long>> transferSections = new ArrayList<>();
-        Pair<Long, Long> lastSection = null;
-        for (CompressionMetadata.Chunk chunk : chunks)
-        {
-            if (lastSection != null)
-            {
-                if (chunk.offset == lastSection.right)
-                {
-                    // extend previous section to end of this chunk
-                    lastSection = Pair.create(lastSection.left, chunk.offset + 
chunk.length + 4); // 4 bytes for CRC
-                }
-                else
-                {
-                    transferSections.add(lastSection);
-                    lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
-                }
-            }
-            else
-            {
-                lastSection = Pair.create(chunk.offset, chunk.offset + 
chunk.length + 4);
-            }
-        }
-        if (lastSection != null)
-            transferSections.add(lastSection);
-        return transferSections;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java 
b/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
deleted file mode 100644
index bd0c2d5..0000000
--- a/src/java/org/apache/cassandra/streaming/compress/CompressionInfo.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.streaming.compress;
-
-import java.io.IOException;
-
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.compress.CompressionMetadata;
-import org.apache.cassandra.schema.CompressionParams;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-
-/**
- * Container that carries compression parameters and chunks to decompress data 
from stream.
- */
-public class CompressionInfo
-{
-    public static final IVersionedSerializer<CompressionInfo> serializer = new 
CompressionInfoSerializer();
-
-    public final CompressionMetadata.Chunk[] chunks;
-    public final CompressionParams parameters;
-
-    public CompressionInfo(CompressionMetadata.Chunk[] chunks, 
CompressionParams parameters)
-    {
-        assert chunks != null && parameters != null;
-        this.chunks = chunks;
-        this.parameters = parameters;
-    }
-
-    static class CompressionInfoSerializer implements 
IVersionedSerializer<CompressionInfo>
-    {
-        public void serialize(CompressionInfo info, DataOutputPlus out, int 
version) throws IOException
-        {
-            if (info == null)
-            {
-                out.writeInt(-1);
-                return;
-            }
-
-            int chunkCount = info.chunks.length;
-            out.writeInt(chunkCount);
-            for (int i = 0; i < chunkCount; i++)
-                CompressionMetadata.Chunk.serializer.serialize(info.chunks[i], 
out, version);
-            // compression params
-            CompressionParams.serializer.serialize(info.parameters, out, 
version);
-        }
-
-        public CompressionInfo deserialize(DataInputPlus in, int version) 
throws IOException
-        {
-            // chunks
-            int chunkCount = in.readInt();
-            if (chunkCount < 0)
-                return null;
-
-            CompressionMetadata.Chunk[] chunks = new 
CompressionMetadata.Chunk[chunkCount];
-            for (int i = 0; i < chunkCount; i++)
-                chunks[i] = 
CompressionMetadata.Chunk.serializer.deserialize(in, version);
-
-            // compression params
-            CompressionParams parameters = 
CompressionParams.serializer.deserialize(in, version);
-            return new CompressionInfo(chunks, parameters);
-        }
-
-        public long serializedSize(CompressionInfo info, int version)
-        {
-            if (info == null)
-                return TypeSizes.sizeof(-1);
-
-            // chunks
-            int chunkCount = info.chunks.length;
-            long size = TypeSizes.sizeof(chunkCount);
-            for (int i = 0; i < chunkCount; i++)
-                size += 
CompressionMetadata.Chunk.serializer.serializedSize(info.chunks[i], version);
-            // compression params
-            size += 
CompressionParams.serializer.serializedSize(info.parameters, version);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9714a7c8/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
 
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index 964fe10..a1fa19f 100644
--- 
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -42,7 +42,7 @@ public class ProgressInfoCompositeData
                                                             "Session peer",
                                                             "Session peer 
storage port",
                                                             "Index of session",
-                                                            "Name of the file",
+                                                            "Name of the 
stream",
                                                             "Direction('IN' or 
'OUT')",
                                                             "Current bytes 
transferred",
                                                             "Total bytes to 
transfer"};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to