http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
new file mode 100644
index 0000000..7eedab7
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * SymmetricLocalSyncTask performs streaming between local(coordinator) node 
and remote replica.
+ */
+public class SymmetricLocalSyncTask extends SymmetricSyncTask implements 
StreamEventHandler
+{
+    private final TraceState state = Tracing.instance.get();
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SymmetricLocalSyncTask.class);
+
+    private final boolean remoteIsTransient;
+    private final UUID pendingRepair;
+    private final boolean pullRepair;
+
+    public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, 
TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean 
pullRepair, PreviewKind previewKind)
+    {
+        super(desc, r1, r2, previewKind);
+        this.remoteIsTransient = remoteIsTransient;
+        this.pendingRepair = pendingRepair;
+        this.pullRepair = pullRepair;
+    }
+
+    @VisibleForTesting
+    StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> 
differences)
+    {
+        StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, 
pendingRepair, previewKind)
+                          .listeners(this)
+                          .flushBeforeTransfer(pendingRepair == null)
+                          // see comment on RangesAtEndpoint.toDummyList for 
why we synthesize replicas here
+                          .requestRanges(dst, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences),
+                                  
RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily);  // 
request ranges from the remote node
+
+        if (!pullRepair && !remoteIsTransient)
+        {
+            // send ranges to the remote node if we are not performing a pull 
repair
+            // see comment on RangesAtEndpoint.toDummyList for why we 
synthesize replicas here
+            plan.transferRanges(dst, desc.keyspace, 
RangesAtEndpoint.toDummyList(differences), desc.columnFamily);
+        }
+
+        return plan;
+    }
+
+    /**
+     * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
+     * that will be called out of band once the streams complete.
+     */
+    @Override
+    protected void startSync(List<Range<Token>> differences)
+    {
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+        // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
+        InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
+
+        String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        Tracing.traceRepair(message);
+
+        createStreamPlan(dst, differences).execute();
+    }
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        if (state == null)
+            return;
+        switch (event.eventType)
+        {
+            case STREAM_PREPARED:
+                StreamEvent.SessionPreparedEvent spe = 
(StreamEvent.SessionPreparedEvent) event;
+                state.trace("Streaming session with {} prepared", 
spe.session.peer);
+                break;
+            case STREAM_COMPLETE:
+                StreamEvent.SessionCompleteEvent sce = 
(StreamEvent.SessionCompleteEvent) event;
+                state.trace("Streaming session with {} {}", sce.peer, 
sce.success ? "completed successfully" : "failed");
+                break;
+            case FILE_PROGRESS:
+                ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+                state.trace("{}/{} ({}%) {} idx:{}{}",
+                            new Object[] { 
FBUtilities.prettyPrintMemory(pi.currentBytes),
+                                           
FBUtilities.prettyPrintMemory(pi.totalBytes),
+                                           pi.currentBytes * 100 / 
pi.totalBytes,
+                                           pi.direction == 
ProgressInfo.Direction.OUT ? "sent to" : "received from",
+                                           pi.sessionIndex,
+                                           pi.peer });
+        }
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        String message = String.format("Sync complete using session %s between 
%s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        Tracing.traceRepair(message);
+        set(stat.withSummaries(result.createSummaries()));
+        finished();
+    }
+
+    public void onFailure(Throwable t)
+    {
+        setException(t);
+        finished();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
new file mode 100644
index 0000000..1f2740f
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.function.Predicate;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RepairException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AsymmetricSyncRequest;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * SymmetricRemoteSyncTask sends {@link SyncRequest} to 
remote(non-coordinator) node
+ * to repair(stream) data with other replica.
+ *
+ * When SymmetricRemoteSyncTask receives SyncComplete from remote node, task 
completes.
+ */
+public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements 
CompletableRemoteSyncTask
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SymmetricRemoteSyncTask.class);
+
+    public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, 
TreeResponse r2, PreviewKind previewKind)
+    {
+        super(desc, r1, r2, previewKind);
+    }
+
+    void sendRequest(RepairMessage request, InetAddressAndPort to)
+    {
+        MessagingService.instance().sendOneWay(request.createMessage(), to);
+    }
+
+    @Override
+    protected void startSync(List<Range<Token>> differences)
+    {
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+
+        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences, previewKind);
+        String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
+        logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
+        Tracing.traceRepair(message);
+        sendRequest(request, request.src);
+    }
+
+    public void syncComplete(boolean success, List<SessionSummary> summaries)
+    {
+        if (success)
+        {
+            set(stat.withSummaries(summaries));
+        }
+        else
+        {
+            setException(new RepairException(desc, previewKind, 
String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint)));
+        }
+        finished();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java 
b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
new file mode 100644
index 0000000..3da2293
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.MerkleTrees;
+
+/**
+ * SymmetricSyncTask will calculate the difference of MerkleTree between two 
nodes
+ * and perform necessary operation to repair replica.
+ */
+public abstract class SymmetricSyncTask extends AbstractSyncTask
+{
+    private static Logger logger = 
LoggerFactory.getLogger(SymmetricSyncTask.class);
+
+    protected final RepairJobDesc desc;
+    protected final TreeResponse r1;
+    protected final TreeResponse r2;
+    protected final PreviewKind previewKind;
+
+    protected volatile SyncStat stat;
+    protected long startTime = Long.MIN_VALUE;
+
+    public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse 
r2, PreviewKind previewKind)
+    {
+        this.desc = desc;
+        this.r1 = r1;
+        this.r2 = r2;
+        this.previewKind = previewKind;
+    }
+
+    /**
+     * Compares trees, and triggers repairs for any ranges that mismatch.
+     */
+    public void run()
+    {
+        startTime = System.currentTimeMillis();
+        // compare trees, and collect differences
+        List<Range<Token>> differences = MerkleTrees.difference(r1.trees, 
r2.trees);
+
+        stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), 
differences.size());
+
+        // choose a repair method based on the significance of the difference
+        String format = String.format("%s Endpoints %s and %s %%s for %s", 
previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, 
desc.columnFamily);
+        if (differences.isEmpty())
+        {
+            logger.info(String.format(format, "are consistent"));
+            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", 
r1.endpoint, r2.endpoint, desc.columnFamily);
+            set(stat);
+            return;
+        }
+
+        // non-0 difference: perform streaming repair
+        logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
+        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} 
for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
+        startSync(differences);
+    }
+
+    public SyncStat getCurrentStat()
+    {
+        return stat;
+    }
+
+    protected void finished()
+    {
+        if (startTime != Long.MIN_VALUE)
+            
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis()
 - startTime, TimeUnit.MILLISECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
deleted file mode 100644
index f7cf5f1..0000000
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.AbstractFuture;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTrees;
-
-/**
- * SyncTask will calculate the difference of MerkleTree between two nodes
- * and perform necessary operation to repair replica.
- */
-public abstract class SyncTask extends AbstractFuture<SyncStat> implements 
Runnable
-{
-    private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
-
-    protected final RepairJobDesc desc;
-    protected final TreeResponse r1;
-    protected final TreeResponse r2;
-    protected final PreviewKind previewKind;
-
-    protected volatile SyncStat stat;
-    protected long startTime = Long.MIN_VALUE;
-
-    public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
PreviewKind previewKind)
-    {
-        this.desc = desc;
-        this.r1 = r1;
-        this.r2 = r2;
-        this.previewKind = previewKind;
-    }
-
-    /**
-     * Compares trees, and triggers repairs for any ranges that mismatch.
-     */
-    public void run()
-    {
-        startTime = System.currentTimeMillis();
-        // compare trees, and collect differences
-        List<Range<Token>> differences = MerkleTrees.difference(r1.trees, 
r2.trees);
-
-        stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), 
differences.size());
-
-        // choose a repair method based on the significance of the difference
-        String format = String.format("%s Endpoints %s and %s %%s for %s", 
previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, 
desc.columnFamily);
-        if (differences.isEmpty())
-        {
-            logger.info(String.format(format, "are consistent"));
-            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", 
r1.endpoint, r2.endpoint, desc.columnFamily);
-            set(stat);
-            return;
-        }
-
-        // non-0 difference: perform streaming repair
-        logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
-        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} 
for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
-        startSync(differences);
-    }
-
-    public SyncStat getCurrentStat()
-    {
-        return stat;
-    }
-
-    protected void finished()
-    {
-        if (startTime != Long.MIN_VALUE)
-            
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis()
 - startTime, TimeUnit.MILLISECONDS);
-    }
-
-    protected abstract void startSync(List<Range<Token>> differences);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index a85a1e5..fc09e71 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -185,13 +185,13 @@ public final class SystemDistributedKeyspace
         processSilent(fmtQuery);
     }
 
-    public static void startRepairs(UUID id, UUID parent_id, String 
keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, 
Iterable<InetAddressAndPort> endpoints)
+    public static void startRepairs(UUID id, UUID parent_id, String 
keyspaceName, String[] cfnames, CommonRange commonRange)
     {
         InetAddressAndPort coordinator = 
FBUtilities.getBroadcastAddressAndPort();
         Set<String> participants = Sets.newHashSet();
         Set<String> participants_v2 = Sets.newHashSet();
 
-        for (InetAddressAndPort endpoint : endpoints)
+        for (InetAddressAndPort endpoint : commonRange.endpoints)
         {
             participants.add(endpoint.getHostAddress(false));
             participants_v2.add(endpoint.toString());
@@ -203,7 +203,7 @@ public final class SystemDistributedKeyspace
 
         for (String cfname : cfnames)
         {
-            for (Range<Token> range : ranges)
+            for (Range<Token> range : commonRange.ranges)
             {
                 String fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                               keyspaceName,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index ed25166..4089e77 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -47,11 +47,14 @@ import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.repair.KeyspaceRepairManager;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -543,9 +546,35 @@ public class LocalSessions
     }
 
     @VisibleForTesting
-    ListenableFuture prepareSession(KeyspaceRepairManager repairManager, UUID 
sessionID, Collection<ColumnFamilyStore> tables, Collection<Range<Token>> 
ranges, ExecutorService executor)
+    ListenableFuture prepareSession(KeyspaceRepairManager repairManager,
+                                    UUID sessionID,
+                                    Collection<ColumnFamilyStore> tables,
+                                    RangesAtEndpoint tokenRanges,
+                                    ExecutorService executor)
+    {
+        return repairManager.prepareIncrementalRepair(sessionID, tables, 
tokenRanges, executor);
+    }
+
+    RangesAtEndpoint filterLocalRanges(String keyspace, Set<Range<Token>> 
ranges)
     {
-        return repairManager.prepareIncrementalRepair(sessionID, tables, 
ranges, executor);
+        RangesAtEndpoint localRanges = 
StorageService.instance.getLocalReplicas(keyspace);
+        RangesAtEndpoint.Builder builder = 
RangesAtEndpoint.builder(localRanges.endpoint());
+        for (Range<Token> range : ranges)
+        {
+            for (Replica replica : localRanges)
+            {
+                if (replica.range().equals(range))
+                {
+                    builder.add(replica);
+                }
+                else if (replica.contains(range))
+                {
+                    builder.add(replica.decorateSubrange(range));
+                }
+            }
+
+        }
+        return builder.build();
     }
 
     /**
@@ -582,7 +611,8 @@ public class LocalSessions
         ExecutorService executor = 
Executors.newFixedThreadPool(parentSession.getColumnFamilyStores().size());
 
         KeyspaceRepairManager repairManager = 
parentSession.getKeyspace().getRepairManager();
-        ListenableFuture repairPreparation = prepareSession(repairManager, 
sessionID, parentSession.getColumnFamilyStores(), parentSession.getRanges(), 
executor);
+        RangesAtEndpoint tokenRanges = 
filterLocalRanges(parentSession.getKeyspace().getName(), 
parentSession.getRanges());
+        ListenableFuture repairPreparation = prepareSession(repairManager, 
sessionID, parentSession.getColumnFamilyStores(), tokenRanges, executor);
 
         Futures.addCallback(repairPreparation, new FutureCallback<Object>()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/KeyspaceParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/KeyspaceParams.java 
b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
index 68ac5e4..cc46474 100644
--- a/src/java/org/apache/cassandra/schema/KeyspaceParams.java
+++ b/src/java/org/apache/cassandra/schema/KeyspaceParams.java
@@ -74,6 +74,11 @@ public final class KeyspaceParams
         return new KeyspaceParams(true, 
ReplicationParams.simple(replicationFactor));
     }
 
+    public static KeyspaceParams simple(String replicationFactor)
+    {
+        return new KeyspaceParams(true, 
ReplicationParams.simple(replicationFactor));
+    }
+
     public static KeyspaceParams simpleTransient(int replicationFactor)
     {
         return new KeyspaceParams(false, 
ReplicationParams.simple(replicationFactor));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/ReplicationParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/ReplicationParams.java 
b/src/java/org/apache/cassandra/schema/ReplicationParams.java
index 21c029e..21e19d6 100644
--- a/src/java/org/apache/cassandra/schema/ReplicationParams.java
+++ b/src/java/org/apache/cassandra/schema/ReplicationParams.java
@@ -51,6 +51,11 @@ public final class ReplicationParams
         return new ReplicationParams(SimpleStrategy.class, 
ImmutableMap.of("replication_factor", Integer.toString(replicationFactor)));
     }
 
+    static ReplicationParams simple(String replicationFactor)
+    {
+        return new ReplicationParams(SimpleStrategy.class, 
ImmutableMap.of("replication_factor", replicationFactor));
+    }
+
     static ReplicationParams nts(Object... args)
     {
         assert args.length % 2 == 0;
@@ -58,9 +63,7 @@ public final class ReplicationParams
         Map<String, String> options = new HashMap<>();
         for (int i = 0; i < args.length; i += 2)
         {
-            String dc = (String) args[i];
-            Integer rf = (Integer) args[i + 1];
-            options.put(dc, rf.toString());
+            options.put((String) args[i], args[i + 1].toString());
         }
 
         return new ReplicationParams(NetworkTopologyStrategy.class, options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index a88aebb..22a8c39 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -137,6 +137,7 @@ public final class SchemaKeyspace
               + "min_index_interval int,"
               + "read_repair_chance double," // no longer used, left for 
drivers' sake
               + "speculative_retry text,"
+              + "speculative_write_threshold text,"
               + "cdc boolean,"
               + "read_repair text,"
               + "PRIMARY KEY ((keyspace_name), table_name))");
@@ -203,6 +204,7 @@ public final class SchemaKeyspace
               + "min_index_interval int,"
               + "read_repair_chance double," // no longer used, left for 
drivers' sake
               + "speculative_retry text,"
+              + "speculative_write_threshold text,"
               + "cdc boolean,"
               + "read_repair text,"
               + "PRIMARY KEY ((keyspace_name), view_name))");
@@ -563,6 +565,7 @@ public final class SchemaKeyspace
                .add("min_index_interval", params.minIndexInterval)
                .add("read_repair_chance", 0.0) // no longer used, left for 
drivers' sake
                .add("speculative_retry", params.speculativeRetry.toString())
+               .add("speculative_write_threshold", 
params.speculativeWriteThreshold.toString())
                .add("crc_check_chance", params.crcCheckChance)
                .add("caching", params.caching.asMap())
                .add("compaction", params.compaction.asMap())
@@ -991,6 +994,7 @@ public final class SchemaKeyspace
                           .minIndexInterval(row.getInt("min_index_interval"))
                           .crcCheckChance(row.getDouble("crc_check_chance"))
                           
.speculativeRetry(SpeculativeRetryPolicy.fromString(row.getString("speculative_retry")))
+                          
.speculativeWriteThreshold(SpeculativeRetryPolicy.fromString(row.getString("speculative_write_threshold")))
                           .cdc(row.has("cdc") && row.getBoolean("cdc"))
                           .readRepair(getReadRepairStrategy(row))
                           .build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java 
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index 6466e2e..10edf4d 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -834,6 +834,12 @@ public final class TableMetadata
             return this;
         }
 
+        public Builder speculativeWriteThreshold(SpeculativeRetryPolicy val)
+        {
+            params.speculativeWriteThreshold(val);
+            return this;
+        }
+
         public Builder extensions(Map<String, ByteBuffer> val)
         {
             params.extensions(val);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java 
b/src/java/org/apache/cassandra/schema/TableParams.java
index afbf26c..0bba5e1 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -51,6 +51,7 @@ public final class TableParams
         MEMTABLE_FLUSH_PERIOD_IN_MS,
         MIN_INDEX_INTERVAL,
         SPECULATIVE_RETRY,
+        SPECULATIVE_WRITE_THRESHOLD,
         CRC_CHECK_CHANCE,
         CDC,
         READ_REPAIR;
@@ -71,6 +72,7 @@ public final class TableParams
     public final int minIndexInterval;
     public final int maxIndexInterval;
     public final SpeculativeRetryPolicy speculativeRetry;
+    public final SpeculativeRetryPolicy speculativeWriteThreshold;
     public final CachingParams caching;
     public final CompactionParams compaction;
     public final CompressionParams compression;
@@ -91,6 +93,7 @@ public final class TableParams
         minIndexInterval = builder.minIndexInterval;
         maxIndexInterval = builder.maxIndexInterval;
         speculativeRetry = builder.speculativeRetry;
+        speculativeWriteThreshold = builder.speculativeWriteThreshold;
         caching = builder.caching;
         compaction = builder.compaction;
         compression = builder.compression;
@@ -118,6 +121,7 @@ public final class TableParams
                             
.memtableFlushPeriodInMs(params.memtableFlushPeriodInMs)
                             .minIndexInterval(params.minIndexInterval)
                             .speculativeRetry(params.speculativeRetry)
+                            
.speculativeWriteThreshold(params.speculativeWriteThreshold)
                             .extensions(params.extensions)
                             .cdc(params.cdc)
                             .readRepair(params.readRepair);
@@ -260,6 +264,7 @@ public final class TableParams
         private int minIndexInterval = 128;
         private int maxIndexInterval = 2048;
         private SpeculativeRetryPolicy speculativeRetry = 
PercentileSpeculativeRetryPolicy.NINETY_NINE_P;
+        private SpeculativeRetryPolicy speculativeWriteThreshold = 
PercentileSpeculativeRetryPolicy.NINETY_NINE_P;
         private CachingParams caching = CachingParams.DEFAULT;
         private CompactionParams compaction = CompactionParams.DEFAULT;
         private CompressionParams compression = CompressionParams.DEFAULT;
@@ -330,6 +335,12 @@ public final class TableParams
             return this;
         }
 
+        public Builder speculativeWriteThreshold(SpeculativeRetryPolicy val)
+        {
+            speculativeWriteThreshold = val;
+            return this;
+        }
+
         public Builder caching(CachingParams val)
         {
             caching = val;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 9d800a0..e817cc8 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,28 +17,35 @@
  */
 package org.apache.cassandra.service;
 
-import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
 
-import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.locator.ReplicaLayout;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.WriteType;
-import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
+
 public abstract class AbstractWriteResponseHandler<T> implements 
IAsyncCallbackWithFailure<T>
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(AbstractWriteResponseHandler.class);
@@ -46,11 +53,9 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     //Count down until all responses and expirations have occured before 
deciding whether the ideal CL was reached.
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
-    protected final Keyspace keyspace;
-    protected final Collection<InetAddressAndPort> naturalEndpoints;
-    public final ConsistencyLevel consistencyLevel;
+    protected final ReplicaLayout.ForToken replicaLayout;
+
     protected final Runnable callback;
-    protected final Collection<InetAddressAndPort> pendingEndpoints;
     protected final WriteType writeType;
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
     = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
@@ -60,7 +65,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     private volatile boolean supportsBackPressure = true;
 
     /**
-      * Delegate to another WriteReponseHandler or possibly this one to track 
if the ideal consistency level was reached.
+      * Delegate to another WriteResponseHandler or possibly this one to track 
if the ideal consistency level was reached.
       * Will be set to null if ideal CL was not configured
       * Will be set to an AWRH delegate if ideal CL was configured
       * Will be same as "this" if this AWRH is the ideal consistency level
@@ -71,18 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
      * @param callback           A callback to be called when the write is 
successful.
      * @param queryStartNanoTime
      */
-    protected AbstractWriteResponseHandler(Keyspace keyspace,
-                                           Collection<InetAddressAndPort> 
naturalEndpoints,
-                                           Collection<InetAddressAndPort> 
pendingEndpoints,
-                                           ConsistencyLevel consistencyLevel,
+    protected AbstractWriteResponseHandler(ReplicaLayout.ForToken 
replicaLayout,
                                            Runnable callback,
                                            WriteType writeType,
                                            long queryStartNanoTime)
     {
-        this.keyspace = keyspace;
-        this.pendingEndpoints = pendingEndpoints;
-        this.consistencyLevel = consistencyLevel;
-        this.naturalEndpoints = naturalEndpoints;
+        this.replicaLayout = replicaLayout;
         this.callback = callback;
         this.writeType = writeType;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -112,12 +111,12 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
             // avoid sending confusing info to the user (see CASSANDRA-6491).
             if (acks >= blockedFor)
                 acks = blockedFor - 1;
-            throw new WriteTimeoutException(writeType, consistencyLevel, acks, 
blockedFor);
+            throw new WriteTimeoutException(writeType, 
replicaLayout.consistencyLevel(), acks, blockedFor);
         }
 
         if (totalBlockFor() + failures > totalEndpoints())
         {
-            throw new WriteFailureException(consistencyLevel, ackCount(), 
totalBlockFor(), writeType, failureReasonByEndpoint);
+            throw new WriteFailureException(replicaLayout.consistencyLevel(), 
ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
         }
     }
 
@@ -136,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
     {
         this.idealCLDelegate = handler;
-        idealCLDelegate.responsesAndExpirations = new 
AtomicInteger(naturalEndpoints.size() + pendingEndpoints.size());
+        idealCLDelegate.responsesAndExpirations = new 
AtomicInteger(replicaLayout.selected().size());
     }
 
     /**
@@ -194,15 +193,20 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     {
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
         // guarantees (see #833)
-        return consistencyLevel.blockFor(keyspace) + pendingEndpoints.size();
+        return 
replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), 
replicaLayout.pending());
     }
 
     /**
-     * @return the total number of endpoints the request has been sent to.
+     * @return the total number of endpoints the request can been sent to.
      */
     protected int totalEndpoints()
     {
-        return naturalEndpoints.size() + pendingEndpoints.size();
+        return replicaLayout.all().size();
+    }
+
+    public ConsistencyLevel consistencyLevel()
+    {
+        return replicaLayout.consistencyLevel();
     }
 
     /**
@@ -225,7 +229,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
 
     public void assureSufficientLiveNodes() throws UnavailableException
     {
-        consistencyLevel.assureSufficientLiveNodes(keyspace, 
Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), 
isAlive));
+        
replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(),
 replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
     }
 
     protected void signal()
@@ -274,12 +278,49 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
             //The condition being signaled is a valid proxy for the CL being 
achieved
             if (!condition.isSignaled())
             {
-                keyspace.metric.writeFailedIdealCL.inc();
+                replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
             }
             else
             {
-                keyspace.metric.idealCLWriteLatency.addNano(System.nanoTime() 
- queryStartNanoTime);
+                
replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - 
queryStartNanoTime);
             }
         }
     }
+
+    /**
+     * Cheap Quorum backup.  If we failed to reach quorum with our initial 
(full) nodes, reach out to other nodes.
+     */
+    public void maybeTryAdditionalReplicas(IMutation mutation, 
StorageProxy.WritePerformer writePerformer, String localDC)
+    {
+        if (replicaLayout.all().size() == replicaLayout.selected().size())
+            return;
+
+        long timeout = Long.MAX_VALUE;
+        List<ColumnFamilyStore> cfs = mutation.getTableIds().stream()
+                                              
.map(Schema.instance::getColumnFamilyStoreInstance)
+                                              .collect(Collectors.toList());
+        for (ColumnFamilyStore cf : cfs)
+            timeout = Math.min(timeout, cf.transientWriteLatencyNanos);
+
+        // no latency information, or we're overloaded
+        if (timeout > TimeUnit.MILLISECONDS.toNanos(mutation.getTimeout()))
+            return;
+
+        try
+        {
+            if (!condition.await(timeout, TimeUnit.NANOSECONDS))
+            {
+                for (ColumnFamilyStore cf : cfs)
+                    cf.metric.speculativeWrites.inc();
+
+                writePerformer.apply(mutation, 
replicaLayout.forNaturalUncontacted(),
+                                     (AbstractWriteResponseHandler<IMutation>) 
this,
+                                     localDC);
+            }
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b60088c..9f37095 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -38,6 +38,8 @@ import com.google.common.util.concurrent.AbstractFuture;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.cassandra.locator.EndpointsByRange;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,11 +62,14 @@ import 
org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.CommonRange;
+import org.apache.cassandra.repair.RepairRunnable;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
@@ -79,7 +84,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
-import static 
org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.queue;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
 
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
@@ -204,10 +210,9 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
      * @return Future for asynchronous call or null if there is no need to 
repair
      */
     public RepairSession submitRepairSession(UUID parentRepairSession,
-                                             Collection<Range<Token>> range,
+                                             CommonRange range,
                                              String keyspace,
                                              RepairParallelism 
parallelismDegree,
-                                             Set<InetAddressAndPort> endpoints,
                                              boolean isIncremental,
                                              boolean pullRepair,
                                              boolean force,
@@ -216,14 +221,15 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                              ListeningExecutorService executor,
                                              String... cfnames)
     {
-        if (endpoints.isEmpty())
+        if (range.endpoints.isEmpty())
             return null;
 
         if (cfnames.length == 0)
             return null;
 
-
-        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, 
isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), range, keyspace,
+                                                        parallelismDegree, 
isIncremental, pullRepair, force,
+                                                        previewKind, 
optimiseStreams, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -296,12 +302,12 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
      *
      * @return neighbors with whom we share the provided range
      */
-    public static Set<InetAddressAndPort> getNeighbors(String keyspaceName, 
Collection<Range<Token>> keyspaceLocalRanges,
-                                                       Range<Token> toRepair, 
Collection<String> dataCenters,
-                                                       Collection<String> 
hosts)
+    public static EndpointsForRange getNeighbors(String keyspaceName, 
Iterable<Range<Token>> keyspaceLocalRanges,
+                                          Range<Token> toRepair, 
Collection<String> dataCenters,
+                                          Collection<String> hosts)
     {
         StorageService ss = StorageService.instance;
-        Map<Range<Token>, List<InetAddressAndPort>> replicaSets = 
ss.getRangeToAddressMap(keyspaceName);
+        EndpointsByRange replicaSets = ss.getRangeToAddressMap(keyspaceName);
         Range<Token> rangeSuperSet = null;
         for (Range<Token> range : keyspaceLocalRanges)
         {
@@ -319,23 +325,16 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             }
         }
         if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
-            return Collections.emptySet();
+            return EndpointsForRange.empty(toRepair);
 
-        Set<InetAddressAndPort> neighbors = new 
HashSet<>(replicaSets.get(rangeSuperSet));
-        neighbors.remove(FBUtilities.getBroadcastAddressAndPort());
+        EndpointsForRange neighbors = 
replicaSets.get(rangeSuperSet).withoutSelf();
 
         if (dataCenters != null && !dataCenters.isEmpty())
         {
             TokenMetadata.Topology topology = 
ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
-            Set<InetAddressAndPort> dcEndpoints = Sets.newHashSet();
-            Multimap<String,InetAddressAndPort> dcEndpointsMap = 
topology.getDatacenterEndpoints();
-            for (String dc : dataCenters)
-            {
-                Collection<InetAddressAndPort> c = dcEndpointsMap.get(dc);
-                if (c != null)
-                   dcEndpoints.addAll(c);
-            }
-            return Sets.intersection(neighbors, dcEndpoints);
+            Multimap<String, InetAddressAndPort> dcEndpointsMap = 
topology.getDatacenterEndpoints();
+            Iterable<InetAddressAndPort> dcEndpoints = 
concat(transform(dataCenters, dcEndpointsMap::get));
+            return neighbors.keep(dcEndpoints);
         }
         else if (hosts != null && !hosts.isEmpty())
         {
@@ -345,7 +344,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                 try
                 {
                     final InetAddressAndPort endpoint = 
InetAddressAndPort.getByName(host.trim());
-                    if 
(endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || 
neighbors.contains(endpoint))
+                    if 
(endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || 
neighbors.endpoints().contains(endpoint))
                         specifiedHost.add(endpoint);
                 }
                 catch (UnknownHostException e)
@@ -366,8 +365,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             }
 
             specifiedHost.remove(FBUtilities.getBroadcastAddressAndPort());
-            return specifiedHost;
-
+            return neighbors.keep(specifiedHost);
         }
 
         return neighbors;
@@ -594,10 +592,10 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
 
         public Set<TableId> getTableIds()
         {
-            return 
ImmutableSet.copyOf(Iterables.transform(getColumnFamilyStores(), cfs -> 
cfs.metadata.id));
+            return ImmutableSet.copyOf(transform(getColumnFamilyStores(), cfs 
-> cfs.metadata.id));
         }
 
-        public Collection<Range<Token>> getRanges()
+        public Set<Range<Token>> getRanges()
         {
             return ImmutableSet.copyOf(ranges);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index e373fb6..ee74df5 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends 
AbstractWriteResponseHandler<T>
 
     public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, 
int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.keyspace, wrapped.naturalEndpoints, 
wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, 
wrapped.writeType, queryStartNanoTime);
+        super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, 
queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index dbd3667..d4cdcc6 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -17,16 +17,15 @@
  */
 package org.apache.cassandra.service;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.IEndpointSnitch;
-import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -41,29 +40,26 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
     private final Map<String, AtomicInteger> responses = new HashMap<String, 
AtomicInteger>();
     private final AtomicInteger acks = new AtomicInteger(0);
 
-    public DatacenterSyncWriteResponseHandler(Collection<InetAddressAndPort> 
naturalEndpoints,
-                                              Collection<InetAddressAndPort> 
pendingEndpoints,
-                                              ConsistencyLevel 
consistencyLevel,
-                                              Keyspace keyspace,
+    public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken 
replicaLayout,
                                               Runnable callback,
                                               WriteType writeType,
                                               long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, 
callback, writeType, queryStartNanoTime);
-        assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
+        super(replicaLayout, callback, writeType, queryStartNanoTime);
+        assert replicaLayout.consistencyLevel() == 
ConsistencyLevel.EACH_QUORUM;
 
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
replicaLayout.keyspace().getReplicationStrategy();
 
         for (String dc : strategy.getDatacenters())
         {
-            int rf = strategy.getReplicationFactor(dc);
+            int rf = strategy.getReplicationFactor(dc).allReplicas;
             responses.put(dc, new AtomicInteger((rf / 2) + 1));
         }
 
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
         // guarantees (see #833)
-        for (InetAddressAndPort pending : pendingEndpoints)
+        for (Replica pending : replicaLayout.pending())
         {
             responses.get(snitch.getDatacenter(pending)).incrementAndGet();
         }
@@ -105,4 +101,5 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
     {
         return false;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index a8d7b28..b458a71 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -17,29 +17,23 @@
  */
 package org.apache.cassandra.service;
 
-import java.util.Collection;
-
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.WriteType;
 
 /**
  * This class blocks for a quorum of responses _in the local datacenter only_ 
(CL.LOCAL_QUORUM).
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
-    public DatacenterWriteResponseHandler(Collection<InetAddressAndPort> 
naturalEndpoints,
-                                          Collection<InetAddressAndPort> 
pendingEndpoints,
-                                          ConsistencyLevel consistencyLevel,
-                                          Keyspace keyspace,
+    public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
                                           Runnable callback,
                                           WriteType writeType,
                                           long queryStartNanoTime)
     {
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, 
callback, writeType, queryStartNanoTime);
-        assert consistencyLevel.isDatacenterLocal();
+        super(replicaLayout, callback, writeType, queryStartNanoTime);
+        assert replicaLayout.consistencyLevel().isDatacenterLocal();
     }
 
     @Override
@@ -58,16 +52,8 @@ public class DatacenterWriteResponseHandler<T> extends 
WriteResponseHandler<T>
     }
 
     @Override
-    protected int totalBlockFor()
-    {
-        // during bootstrap, include pending endpoints (only local here) in 
the count
-        // or we may fail the consistency level guarantees (see #833, #8058)
-        return consistencyLevel.blockFor(keyspace) + 
consistencyLevel.countLocalEndpoints(pendingEndpoints);
-    }
-
-    @Override
     protected boolean waitingFor(InetAddressAndPort from)
     {
-        return consistencyLevel.isLocal(from);
+        return replicaLayout.consistencyLevel().isLocal(from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java 
b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index e1c0f55..7b6bd58 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -92,7 +92,7 @@ public class PendingRangeCalculatorService
     {
         int jobs = updateJobs.incrementAndGet();
         PendingRangeCalculatorServiceDiagnostics.taskCountChanged(instance, 
jobs);
-        executor.submit(new PendingRangeTask(updateJobs));
+        executor.execute(new PendingRangeTask(updateJobs));
     }
 
     public void blockUntilFinished()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to