Repository: cassandra
Updated Branches:
  refs/heads/trunk 16bf51211 -> a5b90f15c


Only stream from unrepaired sstables during incremental repair

Patch by marcuse; reviewed by yukim for CASSANDRA-8267


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5b90f15
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5b90f15
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5b90f15

Branch: refs/heads/trunk
Commit: a5b90f15c53e256bff4ad382745e34a739a5983a
Parents: 16bf512
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Dec 8 15:17:51 2014 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon May 11 09:29:09 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++++++++--
 .../org/apache/cassandra/io/sstable/SSTableLoader.java  |  2 +-
 .../cassandra/net/IncomingStreamingConnection.java      |  2 +-
 src/java/org/apache/cassandra/repair/LocalSyncTask.java |  9 ++++++++-
 .../apache/cassandra/repair/StreamingRepairTask.java    |  9 ++++++++-
 .../apache/cassandra/streaming/ConnectionHandler.java   |  3 ++-
 .../apache/cassandra/streaming/StreamCoordinator.java   |  8 +++++---
 src/java/org/apache/cassandra/streaming/StreamPlan.java |  8 ++++----
 .../apache/cassandra/streaming/StreamResultFuture.java  |  9 +++++----
 .../org/apache/cassandra/streaming/StreamSession.java   | 12 ++++++++++--
 .../cassandra/streaming/messages/StreamInitMessage.java |  9 +++++++--
 .../org/apache/cassandra/dht/StreamStateStoreTest.java  |  4 ++--
 .../cassandra/streaming/StreamTransferTaskTest.java     |  2 +-
 14 files changed, 64 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f2f12c4..bff5970 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Only stream from unrepaired sstables with incremental repair 
(CASSANDRA-8267)
  * Aggregate UDFs allow SFUNC return type to differ from STYPE if FFUNC 
specified (CASSANDRA-9321)
  * Failure detector detects and ignores local pauses (CASSANDRA-9183)
  * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 26a430a..fec3afc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1816,7 +1816,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
      * for rows for all of @param rowBoundsCollection, inclusive, according to 
the interval tree.
      */
-    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final 
Collection<AbstractBounds<RowPosition>> rowBoundsCollection)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final 
Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean 
includeRepaired)
     {
         return new Function<DataTracker.View, List<SSTableReader>>()
         {
@@ -1824,8 +1824,15 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             {
                 Set<SSTableReader> sstables = Sets.newHashSet();
                 for (AbstractBounds<RowPosition> rowBounds : 
rowBoundsCollection)
-                    sstables.addAll(view.sstablesInBounds(rowBounds));
+                {
+                    for (SSTableReader sstable : 
view.sstablesInBounds(rowBounds))
+                    {
+                        if (includeRepaired || !sstable.isRepaired())
+                            sstables.add(sstable);
+                    }
+                }
 
+                logger.debug("ViewFilter for {}/{} sstables", sstables.size(), 
getSSTables().size());
                 return ImmutableList.copyOf(sstables);
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 6991958..910cdcc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -156,7 +156,7 @@ public class SSTableLoader implements StreamEventHandler
         client.init(keyspace);
         outputHandler.output("Established connection to initial hosts");
 
-        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, 
false).connectionFactory(client.getConnectionFactory());
+        StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, 
false, false).connectionFactory(client.getConnectionFactory());
 
         Map<InetAddress, Collection<Range<Token>>> endpointToRanges = 
client.getEndpointToRangesMap();
         openSSTables(endpointToRanges);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java 
b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index d00e4b8..274e47b 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -66,7 +66,7 @@ public class IncomingStreamingConnection extends Thread 
implements Closeable
             // The receiving side distinguish two connections by looking at 
StreamInitMessage#isForOutgoing.
             // Note: we cannot use the same socket for incoming and outgoing 
streams because we want to
             // parallelize said streams and the socket is blocking, so we 
might deadlock.
-            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.description, init.from, socket, init.isForOutgoing, version, 
init.keepSSTableLevel);
+            StreamResultFuture.initReceivingSide(init.sessionIndex, 
init.planId, init.description, init.from, socket, init.isForOutgoing, version, 
init.keepSSTableLevel, init.isIncremental);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index 010c959..daace01 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.ProgressInfo;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
@@ -65,8 +66,14 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
 
         String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
         logger.info("[repair #{}] {}", desc.sessionId, message);
+        boolean isIncremental = false;
+        if (desc.parentSessionId != null)
+        {
+            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+            isIncremental = prs.isIncremental;
+        }
         Tracing.traceRepair(message);
-        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+        new StreamPlan("Repair", repairedAt, 1, false, 
isIncremental).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
                                             .requestRanges(dst, preferred, 
desc.keyspace, differences, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index cbf0580..25ef06e 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamEventHandler;
 import org.apache.cassandra.streaming.StreamPlan;
@@ -55,7 +56,13 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
         InetAddress dest = request.dst;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
-        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+        boolean isIncremental = false;
+        if (desc.parentSessionId != null)
+        {
+            ActiveRepairService.ParentRepairSession prs = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
+            isIncremental = prs.isIncremental;
+        }
+        new StreamPlan("Repair", repairedAt, 1, false, 
isIncremental).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote 
node
                                             .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java 
b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 780018c..bb27972 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -183,7 +183,8 @@ public class ConnectionHandler
                     session.planId(),
                     session.description(),
                     isForOutgoing,
-                    session.keepSSTableLevel());
+                    session.keepSSTableLevel(),
+                    session.isIncremental());
             ByteBuffer messageBuf = message.createMessage(false, 
protocolVersion);
             DataOutputStreamPlus out = getWriteChannel(socket);
             out.write(messageBuf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java 
b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 8d0cdce..603366d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -46,12 +46,14 @@ public class StreamCoordinator
     private final int connectionsPerHost;
     private StreamConnectionFactory factory;
     private final boolean keepSSTableLevel;
+    private final boolean isIncremental;
 
-    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, 
StreamConnectionFactory factory)
+    public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, 
boolean isIncremental, StreamConnectionFactory factory)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -235,7 +237,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size(), keepSSTableLevel);
+                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size(), keepSSTableLevel, isIncremental);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -267,7 +269,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, 
keepSSTableLevel);
+                session = new StreamSession(peer, connecting, factory, id, 
keepSSTableLevel, isIncremental);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index 0e5cc6f..0d963ed 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -47,19 +47,19 @@ public class StreamPlan
      */
     public StreamPlan(String description)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, 
false);
     }
 
     public StreamPlan(String description, boolean keepSSTableLevels)
     {
-        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, 
keepSSTableLevels);
+        this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, 
keepSSTableLevels, false);
     }
 
-    public StreamPlan(String description, long repairedAt, int 
connectionsPerHost, boolean keepSSTableLevels)
+    public StreamPlan(String description, long repairedAt, int 
connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental)
     {
         this.description = description;
         this.repairedAt = repairedAt;
-        this.coordinator = new StreamCoordinator(connectionsPerHost, 
keepSSTableLevels, new DefaultConnectionFactory());
+        this.coordinator = new StreamCoordinator(connectionsPerHost, 
keepSSTableLevels, isIncremental, new DefaultConnectionFactory());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index ce9518a..99adab0 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -69,9 +69,9 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, String description, boolean 
keepSSTableLevels)
+    private StreamResultFuture(UUID planId, String description, boolean 
keepSSTableLevels, boolean isIncremental)
     {
-        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, 
new DefaultConnectionFactory()));
+        this(planId, description, new StreamCoordinator(0, keepSSTableLevels, 
isIncremental, new DefaultConnectionFactory()));
     }
 
     static StreamResultFuture init(UUID planId, String description, 
Collection<StreamEventHandler> listeners, StreamCoordinator coordinator)
@@ -102,7 +102,8 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
                                                                     Socket 
socket,
                                                                     boolean 
isForOutgoing,
                                                                     int 
version,
-                                                                    boolean 
keepSSTableLevel) throws IOException
+                                                                    boolean 
keepSSTableLevel,
+                                                                    boolean 
isIncremental) throws IOException
     {
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -110,7 +111,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for 
{}", planId, sessionIndex, description);
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
-            future = new StreamResultFuture(planId, description, 
keepSSTableLevel);
+            future = new StreamResultFuture(planId, description, 
keepSSTableLevel, isIncremental);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachSocket(from, sessionIndex, socket, isForOutgoing, 
version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5a056c4..09ee3e4 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -145,6 +145,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
 
     private AtomicBoolean isAborted = new AtomicBoolean(false);
     private final boolean keepSSTableLevel;
+    private final boolean isIncremental;
 
     public static enum State
     {
@@ -166,7 +167,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
      * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index, boolean keepSSTableLevel)
+    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index, boolean keepSSTableLevel, boolean 
isIncremental)
     {
         this.peer = peer;
         this.connecting = connecting;
@@ -175,6 +176,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         this.handler = new ConnectionHandler(this);
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     public UUID planId()
@@ -197,6 +199,12 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
         return keepSSTableLevel;
     }
 
+    public boolean isIncremental()
+    {
+        return isIncremental;
+    }
+
+
     /**
      * Bind this session to report to specific {@link StreamResultFuture} and
      * perform pre-streaming initialization.
@@ -306,7 +314,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber
                 List<AbstractBounds<RowPosition>> rowBoundsList = new 
ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
                     rowBoundsList.add(Range.makeRowRange(range));
-                
refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList)).refs);
+                
refs.addAll(cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList, 
!isIncremental)).refs);
             }
 
             List<SSTableStreamingSections> sections = new 
ArrayList<>(refs.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java 
b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 03ac944..4928039 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -48,8 +48,9 @@ public class StreamInitMessage
     // true if this init message is to connect for outgoing message on 
receiving side
     public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
+    public final boolean isIncremental;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
String description, boolean isForOutgoing, boolean keepSSTableLevel)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, 
String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean 
isIncremental)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
@@ -57,6 +58,7 @@ public class StreamInitMessage
         this.description = description;
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
+        this.isIncremental = isIncremental;
     }
 
     /**
@@ -109,6 +111,7 @@ public class StreamInitMessage
             out.writeUTF(message.description);
             out.writeBoolean(message.isForOutgoing);
             out.writeBoolean(message.keepSSTableLevel);
+            out.writeBoolean(message.isIncremental);
         }
 
         public StreamInitMessage deserialize(DataInput in, int version) throws 
IOException
@@ -119,7 +122,8 @@ public class StreamInitMessage
             String description = in.readUTF();
             boolean sentByInitiator = in.readBoolean();
             boolean keepSSTableLevel = in.readBoolean();
-            return new StreamInitMessage(from, sessionIndex, planId, 
description, sentByInitiator, keepSSTableLevel);
+            boolean isIncremental = in.readBoolean();
+            return new StreamInitMessage(from, sessionIndex, planId, 
description, sentByInitiator, keepSSTableLevel, isIncremental);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -130,6 +134,7 @@ public class StreamInitMessage
             size += TypeSizes.NATIVE.sizeof(message.description);
             size += TypeSizes.NATIVE.sizeof(message.isForOutgoing);
             size += TypeSizes.NATIVE.sizeof(message.keepSSTableLevel);
+            size += TypeSizes.NATIVE.sizeof(message.isIncremental);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java 
b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index c8b9f05..86781d9 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -42,7 +42,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), 
factory.fromString("100"));
 
         InetAddress local = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true);
+        StreamSession session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false);
         session.addStreamRequest("keyspace1", Collections.singleton(range), 
Collections.singleton("cf"), 0);
 
         StreamStateStore store = new StreamStateStore();
@@ -63,7 +63,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), 
factory.fromString("200"));
-        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true);
+        session = new StreamSession(local, local, new 
DefaultConnectionFactory(), 0, true, false);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), 
Collections.singleton("cf"), 0);
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5b90f15/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 87b9c38..a0ac870 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -64,7 +64,7 @@ public class StreamTransferTaskTest
         String cf = "Standard1";
 
         InetAddress peer = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(peer, peer, null, 0, true);
+        StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables

Reply via email to