Repository: cassandra
Updated Branches:
  refs/heads/trunk 5b645de13 -> f7431b432


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
new file mode 100644
index 0000000..8119400
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.SimpleBuilders;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+/**
+ * Tests DataResolvers handing of transient replicas
+ */
+public class DataResolverTransientTest extends AbstractReadResponseTest
+{
+    private static DecoratedKey key;
+
+    @Before
+    public void setUp()
+    {
+        key = Util.dk("key1");
+    }
+
+    private static PartitionUpdate.Builder update(TableMetadata metadata, 
String key, Row... rows)
+    {
+        PartitionUpdate.Builder builder = new 
PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), 
rows.length, false);
+        for (Row row: rows)
+        {
+            builder.add(row);
+        }
+        return builder;
+    }
+
+    private static PartitionUpdate.Builder update(Row... rows)
+    {
+        return update(cfm, "key1", rows);
+    }
+
+    private static Row.SimpleBuilder rowBuilder(int clustering)
+    {
+        return new SimpleBuilders.RowBuilder(cfm, 
Integer.toString(clustering));
+    }
+
+    private static Row row(long timestamp, int clustering, int value)
+    {
+        return rowBuilder(clustering).timestamp(timestamp).add("c1", 
Integer.toString(value)).build();
+    }
+
+    private static DeletionTime deletion(long timeMillis)
+    {
+        TimeUnit MILLIS = TimeUnit.MILLISECONDS;
+        return new DeletionTime(MILLIS.toMicros(timeMillis), 
Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
+    }
+
+    /**
+     * Tests that the given update doesn't cause data resolver to attempt to 
repair a transient replica
+     */
+    private void assertNoTransientRepairs(PartitionUpdate update)
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
ConsistencyLevel.QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update), false));
+        resolver.preprocess(response(command, EP2, iter(update), false));
+        resolver.preprocess(response(command, EP3, 
EmptyIterators.unfilteredPartition(update.metadata()), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+        assertPartitionsEqual(filter(iter(update)), resolver.resolve());
+        Assert.assertTrue(repair.dataWasConsumed());
+        Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
+    }
+
+    @Test
+    public void emptyRowRepair()
+    {
+        assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 
5)).build());
+    }
+
+    @Test
+    public void emptyPartitionDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.addPartitionDeletion(deletion(1999));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * Partition level deletion responses shouldn't sent data to a transient 
replica
+     */
+    @Test
+    public void emptyRowDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.add(rowBuilder(1).timestamp(1999).delete().build());
+        assertNoTransientRepairs(builder.build());
+    }
+
+    @Test
+    public void emptyComplexDeletionRepair()
+    {
+
+        long[] ts = {1000, 2000};
+
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+        assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
+
+    }
+
+    @Test
+    public void emptyRangeTombstoneRepairs()
+    {
+        Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), 
Clustering.make(ByteBufferUtil.bytes("b")));
+        PartitionUpdate.Builder builder = update();
+        builder.add(new RangeTombstone(slice, deletion(2000)));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * If the full replicas need to repair each other, repairs shouldn't be 
sent to transient replicas
+     */
+    @Test
+    public void fullRepairsIgnoreTransientReplicas()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command, QUORUM);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 
5)).build()), false));
+        resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 
4)).build()), false));
+        resolver.preprocess(response(command, EP3, 
EmptyIterators.unfilteredPartition(cfm), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        consume(resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        Assert.assertTrue(repair.sent.containsKey(EP1));
+        Assert.assertTrue(repair.sent.containsKey(EP2));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+    }
+
+    /**
+     * If the transient replica has new data, the full replicas shoould be 
repaired, the transient one should not
+     */
+    @Test
+    public void transientMismatchesRepairFullReplicas()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = 
EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair<?, ?> repair = new TestableReadRepair(command, 
QUORUM);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, 
QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        PartitionUpdate transData = update(row(1000, 5, 5)).build();
+        resolver.preprocess(response(command, EP1, 
EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP2, 
EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP3, iter(transData), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), 
filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
+        assertPartitionsEqual(filter(iter(transData)), 
filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+
+    }
+
+    private ReplicaLayout.ForToken plan(EndpointsForToken replicas, 
ConsistencyLevel consistencyLevel)
+    {
+        return new ReplicaLayout.ForToken(ks, consistencyLevel, 
replicas.token(), replicas, null, replicas);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
new file mode 100644
index 0000000..5306a74
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.SimpleBuilders;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.NoopReadRepair;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+public class DigestResolverTest extends AbstractReadResponseTest
+{
+    private static PartitionUpdate.Builder update(TableMetadata metadata, 
String key, Row... rows)
+    {
+        PartitionUpdate.Builder builder = new 
PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), 
rows.length, false);
+        for (Row row: rows)
+        {
+            builder.add(row);
+        }
+        return builder;
+    }
+
+    private static PartitionUpdate.Builder update(Row... rows)
+    {
+        return update(cfm, "key1", rows);
+    }
+
+    private static Row row(long timestamp, int clustering, int value)
+    {
+        SimpleBuilders.RowBuilder builder = new SimpleBuilders.RowBuilder(cfm, 
Integer.toString(clustering));
+        builder.timestamp(timestamp).add("c1", Integer.toString(value));
+        return builder.build();
+    }
+
+    @Test
+    public void noRepairNeeded()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
full(EP1), full(EP2));
+        TestableReadRepair readRepair = new TestableReadRepair(command, 
ConsistencyLevel.QUORUM);
+        DigestResolver resolver = new DigestResolver(command, 
plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+
+        PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 
5)).build();
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP2, iter(response), true));
+        resolver.preprocess(response(command, EP1, iter(response), false));
+        Assert.assertTrue(resolver.isDataPresent());
+        Assert.assertTrue(resolver.responsesMatch());
+
+        assertPartitionsEqual(filter(iter(response)), resolver.getData());
+    }
+
+    @Test
+    public void digestMismatch()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
full(EP1), full(EP2));
+        DigestResolver resolver = new DigestResolver(command, 
plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance,0);
+
+        PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 
5)).build();
+        PartitionUpdate response2 = update(row(2000, 4, 5)).build();
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP2, iter(response1), true));
+        resolver.preprocess(response(command, EP1, iter(response2), false));
+        Assert.assertTrue(resolver.isDataPresent());
+        Assert.assertFalse(resolver.responsesMatch());
+        Assert.assertFalse(resolver.hasTransientResponse());
+    }
+
+    /**
+     * A full response and a transient response, with the transient response 
being a subset of the full one
+     */
+    @Test
+    public void agreeingTransient()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
full(EP1), trans(EP2));
+        TestableReadRepair readRepair = new TestableReadRepair(command, 
ConsistencyLevel.QUORUM);
+        DigestResolver resolver = new DigestResolver(command, 
plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0);
+
+        PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 
5)).build();
+        PartitionUpdate response2 = update(row(1000, 5, 5)).build();
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(response1), false));
+        resolver.preprocess(response(command, EP2, iter(response2), false));
+        Assert.assertTrue(resolver.isDataPresent());
+        Assert.assertTrue(resolver.responsesMatch());
+        Assert.assertTrue(resolver.hasTransientResponse());
+        Assert.assertTrue(readRepair.sent.isEmpty());
+    }
+
+    /**
+     * Transient responses shouldn't be classified as the single dataResponse
+     */
+    @Test
+    public void transientResponse()
+    {
+        SinglePartitionReadCommand command = 
SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), 
full(EP1), trans(EP2));
+        DigestResolver resolver = new DigestResolver(command, 
plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance, 0);
+
+        PartitionUpdate response2 = update(row(1000, 5, 5)).build();
+        Assert.assertFalse(resolver.isDataPresent());
+        Assert.assertFalse(resolver.hasTransientResponse());
+        resolver.preprocess(response(command, EP2, iter(response2), false));
+        Assert.assertFalse(resolver.isDataPresent());
+        Assert.assertTrue(resolver.hasTransientResponse());
+    }
+
+    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken replicas)
+    {
+        return new ReplicaLayout.ForToken(ks, consistencyLevel, 
replicas.token(), replicas, null, replicas);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index de7b2e4..3b102f2 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.cassandra.service.reads;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableList;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,12 +37,15 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.reads.AbstractReadExecutor;
 
+import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -48,7 +53,8 @@ public class ReadExecutorTest
 {
     static Keyspace ks;
     static ColumnFamilyStore cfs;
-    static List<InetAddressAndPort> targets;
+    static EndpointsForToken targets;
+    static Token dummy;
 
     @BeforeClass
     public static void setUpClass() throws Throwable
@@ -57,8 +63,13 @@ public class ReadExecutorTest
         SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), 
SchemaLoader.standardCFMD("Foo", "Bar"));
         ks = Keyspace.open("Foo");
         cfs = ks.getColumnFamilyStore("Bar");
-        targets = 
ImmutableList.of(InetAddressAndPort.getByName("127.0.0.255"), 
InetAddressAndPort.getByName("127.0.0.254"), 
InetAddressAndPort.getByName("127.0.0.253"));
-        cfs.sampleLatencyNanos = 0;
+        dummy = Murmur3Partitioner.instance.getMinimumToken();
+        targets = EndpointsForToken.of(dummy,
+                full(InetAddressAndPort.getByName("127.0.0.255")),
+                full(InetAddressAndPort.getByName("127.0.0.254")),
+                full(InetAddressAndPort.getByName("127.0.0.253"))
+        );
+        cfs.sampleReadLatencyNanos = 0;
     }
 
     @Before
@@ -78,7 +89,7 @@ public class ReadExecutorTest
     {
         assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
         assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
-        AbstractReadExecutor executor = new 
AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, 
System.nanoTime(), true);
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.NeverSpeculatingReadExecutor(cfs, new 
MockSinglePartitionReadCommand(), plan(targets, ConsistencyLevel.LOCAL_QUORUM), 
System.nanoTime(), true);
         executor.maybeTryAdditionalReplicas();
         try
         {
@@ -93,7 +104,7 @@ public class ReadExecutorTest
         assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
 
         //Shouldn't increment
-        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, 
cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, 
targets, System.nanoTime(), false);
+        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(cfs, 
new MockSinglePartitionReadCommand(), plan(targets, 
ConsistencyLevel.LOCAL_QUORUM), System.nanoTime(), false);
         executor.maybeTryAdditionalReplicas();
         try
         {
@@ -119,7 +130,7 @@ public class ReadExecutorTest
         assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
         assertEquals(0, ks.metric.speculativeRetries.getCount());
         assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), 
ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(cfs, new 
MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), 
plan(ConsistencyLevel.LOCAL_QUORUM, targets, targets.subList(0, 2)), 
System.nanoTime());
         executor.maybeTryAdditionalReplicas();
         new Thread()
         {
@@ -127,8 +138,8 @@ public class ReadExecutorTest
             public void run()
             {
                 //Failures end the read promptly but don't require mock data 
to be suppleid
-                executor.handler.onFailure(targets.get(0), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-                executor.handler.onFailure(targets.get(1), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.onFailure(targets.get(0).endpoint(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.onFailure(targets.get(1).endpoint(), 
RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
                 executor.handler.condition.signalAll();
             }
         }.start();
@@ -160,7 +171,7 @@ public class ReadExecutorTest
         assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
         assertEquals(0, ks.metric.speculativeRetries.getCount());
         assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
-        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new 
MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, 
System.nanoTime());
+        AbstractReadExecutor executor = new 
AbstractReadExecutor.SpeculatingReadExecutor(cfs, new 
MockSinglePartitionReadCommand(), plan(ConsistencyLevel.LOCAL_QUORUM, targets, 
targets.subList(0, 2)), System.nanoTime());
         executor.maybeTryAdditionalReplicas();
         try
         {
@@ -188,7 +199,7 @@ public class ReadExecutorTest
 
         MockSinglePartitionReadCommand(long timeout)
         {
-            super(false, 0, cfs.metadata(), 0, null, null, null, 
Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
+            super(false, 0, false, cfs.metadata(), 0, null, null, null, 
Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
             this.timeout = timeout;
         }
 
@@ -213,4 +224,13 @@ public class ReadExecutorTest
 
     }
 
+    private ReplicaLayout.ForToken plan(EndpointsForToken targets, 
ConsistencyLevel consistencyLevel)
+    {
+        return plan(consistencyLevel, targets, targets);
+    }
+
+    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken natural, EndpointsForToken selected)
+    {
+        return new ReplicaLayout.ForToken(ks, consistencyLevel, 
natural.token(), natural, null, selected);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 9717c4e..7e6ee29 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -8,7 +8,6 @@ import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import org.junit.Assert;
@@ -39,7 +38,11 @@ import org.apache.cassandra.db.rows.BufferCell;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -49,6 +52,9 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
+
 @Ignore
 public abstract  class AbstractReadRepairTest
 {
@@ -60,6 +66,12 @@ public abstract  class AbstractReadRepairTest
     static InetAddressAndPort target3;
     static List<InetAddressAndPort> targets;
 
+    static Replica replica1;
+    static Replica replica2;
+    static Replica replica3;
+    static EndpointsForRange replicas;
+    static ReplicaLayout<?, ?> replicaLayout;
+
     static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
     static DecoratedKey key;
     static Cell cell1;
@@ -191,7 +203,8 @@ public abstract  class AbstractReadRepairTest
         ks = Keyspace.open(ksName);
         cfs = ks.getColumnFamilyStore("tbl");
 
-        cfs.sampleLatencyNanos = 0;
+        cfs.sampleReadLatencyNanos = 0;
+        cfs.transientWriteLatencyNanos = 0;
 
         target1 = InetAddressAndPort.getByName("127.0.0.255");
         target2 = InetAddressAndPort.getByName("127.0.0.254");
@@ -199,6 +212,13 @@ public abstract  class AbstractReadRepairTest
 
         targets = ImmutableList.of(target1, target2, target3);
 
+        replica1 = fullReplica(target1, FULL_RANGE);
+        replica2 = fullReplica(target2, FULL_RANGE);
+        replica3 = fullReplica(target3, FULL_RANGE);
+        replicas = EndpointsForRange.of(replica1, replica2, replica3);
+
+        replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas);
+
         // default test values
         key  = dk(5);
         cell1 = cell("v", "val1", now);
@@ -220,14 +240,26 @@ public abstract  class AbstractReadRepairTest
     public void setUp()
     {
         assert configured : "configureClass must be called in a @BeforeClass 
method";
-        cfs.sampleLatencyNanos = 0;
+
+        cfs.sampleReadLatencyNanos = 0;
+        cfs.transientWriteLatencyNanos = 0;
+    }
+
+    static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, 
EndpointsForRange targets)
+    {
+        return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, 
ReplicaUtils.FULL_BOUNDS, replicas, targets);
+    }
+
+    static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel 
consistencyLevel, EndpointsForRange replicas)
+    {
+        return new ReplicaLayout.ForRange(ks, consistencyLevel, 
ReplicaUtils.FULL_BOUNDS, replicas, replicas);
     }
 
-    public abstract InstrumentedReadRepair 
createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, 
ConsistencyLevel consistency);
+    public abstract InstrumentedReadRepair 
createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> 
replicaLayout, long queryStartNanoTime);
 
-    public InstrumentedReadRepair createInstrumentedReadRepair()
+    public InstrumentedReadRepair 
createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout)
     {
-        return createInstrumentedReadRepair(command, System.nanoTime(), 
ConsistencyLevel.QUORUM);
+        return createInstrumentedReadRepair(command, replicaLayout, 
System.nanoTime());
 
     }
 
@@ -238,12 +270,11 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void readSpeculationCycle()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair();
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout(replicas, 
EndpointsForRange.of(replica1, replica2)));
         ResultConsumer consumer = new ResultConsumer();
 
-
         Assert.assertEquals(epSet(), repair.getReadRecipients());
-        repair.startRepair(null, targets, Lists.newArrayList(target1, 
target2), consumer);
+        repair.startRepair(null, consumer);
 
         Assert.assertEquals(epSet(target1, target2), 
repair.getReadRecipients());
         repair.maybeSendAdditionalReads();
@@ -258,11 +289,11 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void noSpeculationRequired()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair();
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout(replicas, 
EndpointsForRange.of(replica1, replica2)));
         ResultConsumer consumer = new ResultConsumer();
 
         Assert.assertEquals(epSet(), repair.getReadRecipients());
-        repair.startRepair(null, targets, Lists.newArrayList(target1, 
target2), consumer);
+        repair.startRepair(null, consumer);
 
         Assert.assertEquals(epSet(target1, target2), 
repair.getReadRecipients());
         repair.getReadCallback().response(msg(target1, cell1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index b06e88a..a574d02 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.cassandra.service.reads.repair;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -31,24 +29,26 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.reads.ReadCallback;
 
 public class BlockingReadRepairTest extends AbstractReadRepairTest
 {
-
-    private static class InstrumentedReadRepairHandler extends 
BlockingPartitionRepair
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
     {
-        public InstrumentedReadRepairHandler(Keyspace keyspace, DecoratedKey 
key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, 
int maxBlockFor, InetAddressAndPort[] participants)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, L replicaLayout)
         {
-            super(keyspace, key, consistency, repairs, maxBlockFor, 
participants);
+            super(Util.dk("not a real usable value"), repairs, maxBlockFor, 
replicaLayout);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -58,13 +58,6 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
             mutationsSent.put(endpoint, message.payload);
         }
 
-        List<InetAddressAndPort> candidates = targets;
-
-        protected List<InetAddressAndPort> getCandidateEndpoints()
-        {
-            return candidates;
-        }
-
         @Override
         protected boolean isLocal(InetAddressAndPort endpoint)
         {
@@ -78,23 +71,22 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         configureClass(ReadRepairStrategy.BLOCKING);
     }
 
-    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, 
Collection<InetAddressAndPort> participants)
+    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
ReplicaLayout<?, ?> replicaLayout)
     {
-        InetAddressAndPort[] participantArray = new 
InetAddressAndPort[participants.size()];
-        participants.toArray(participantArray);
-        return new InstrumentedReadRepairHandler(ks, key, 
ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray);
+        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, 
replicaLayout);
     }
 
-    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor)
+    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
     {
-        return createRepairHandler(repairs, maxBlockFor, repairs.keySet());
+        EndpointsForRange replicas = 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
+        return createRepairHandler(repairs, maxBlockFor, 
replicaLayout(replicas, replicas));
     }
 
-    private static class InstrumentedBlockingReadRepair extends 
BlockingReadRepair implements InstrumentedReadRepair
+    private static class InstrumentedBlockingReadRepair<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> 
implements InstrumentedReadRepair<E, L>
     {
-        public InstrumentedBlockingReadRepair(ReadCommand command, long 
queryStartNanoTime, ConsistencyLevel consistency)
+        public InstrumentedBlockingReadRepair(ReadCommand command, L 
replicaLayout, long queryStartNanoTime)
         {
-            super(command, queryStartNanoTime, consistency);
+            super(command, replicaLayout, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -109,12 +101,6 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         }
 
         @Override
-        Iterable<InetAddressAndPort> getCandidatesForToken(Token token)
-        {
-            return targets;
-        }
-
-        @Override
         public Set<InetAddressAndPort> getReadRecipients()
         {
             return readCommandRecipients;
@@ -128,9 +114,9 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, long queryStartNanoTime, ConsistencyLevel consistency)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
     {
-        return new InstrumentedBlockingReadRepair(command, queryStartNanoTime, 
consistency);
+        return new InstrumentedBlockingReadRepair(command, replicaLayout, 
queryStartNanoTime);
     }
 
     @Test
@@ -152,12 +138,12 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         Mutation repair2 = mutation(cell1);
 
         // check that the correct repairs are calculated
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, repair1);
-        repairs.put(target2, repair2);
-
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, repair1);
+        repairs.put(replica2, repair2);
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
+        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        InstrumentedReadRepairHandler<?, ?> handler = 
createRepairHandler(repairs, 2, replicaLayout);
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -188,9 +174,9 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     @Test
     public void noAdditionalMutationRequired() throws Exception
     {
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, mutation(cell2));
-        repairs.put(target2, mutation(cell1));
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, mutation(cell2));
+        repairs.put(replica2, mutation(cell1));
 
         InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
         handler.sendInitialRepairs();
@@ -209,15 +195,14 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     @Test
     public void noAdditionalMutationPossible() throws Exception
     {
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, mutation(cell2));
-        repairs.put(target2, mutation(cell1));
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, mutation(cell2));
+        repairs.put(replica2, mutation(cell1));
 
         InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
         handler.sendInitialRepairs();
 
         // we've already sent mutations to all candidates, so we shouldn't 
send any more
-        handler.candidates = Lists.newArrayList(target1, target2);
         handler.mutationsSent.clear();
         handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
         Assert.assertTrue(handler.mutationsSent.isEmpty());
@@ -232,12 +217,11 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     {
         Mutation repair1 = mutation(cell2);
 
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, repair1);
-        Collection<InetAddressAndPort> participants = 
Lists.newArrayList(target1, target2);
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, participants);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -252,10 +236,10 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     @Test
     public void onlyBlockOnQuorum()
     {
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, mutation(cell1));
-        repairs.put(target2, mutation(cell2));
-        repairs.put(target3, mutation(cell3));
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, mutation(cell1));
+        repairs.put(replica2, mutation(cell2));
+        repairs.put(replica3, mutation(cell3));
         Assert.assertEquals(3, repairs.size());
 
         InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2);
@@ -277,30 +261,29 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     @Test
     public void remoteDCTest() throws Exception
     {
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, mutation(cell1));
-
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, mutation(cell1));
 
-        InetAddressAndPort remote1 = InetAddressAndPort.getByName("10.0.0.1");
-        InetAddressAndPort remote2 = InetAddressAndPort.getByName("10.0.0.2");
+        Replica remote1 = 
ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.1"));
+        Replica remote2 = 
ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.2"));
         repairs.put(remote1, mutation(cell1));
 
-        Collection<InetAddressAndPort> participants = 
Lists.newArrayList(target1, target2, remote1, remote2);
-
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, participants);
+        EndpointsForRange participants = EndpointsForRange.of(replica1, 
replica2, remote1, remote2);
+        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, 
ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, 
participants);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicaLayout);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
-        Assert.assertTrue(handler.mutationsSent.containsKey(target1));
-        Assert.assertTrue(handler.mutationsSent.containsKey(remote1));
+        
Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
+        
Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
 
         Assert.assertEquals(1, handler.waitingOn());
         Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
 
-        handler.ack(remote1);
+        handler.ack(remote1.endpoint());
         Assert.assertEquals(1, handler.waitingOn());
         Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
 
-        handler.ack(target1);
+        handler.ack(replica1.endpoint());
         Assert.assertEquals(0, handler.waitingOn());
         Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index 1f07c2b..c345ee6 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -26,20 +26,23 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.OverrideConfigurationLoader;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.diag.DiagnosticEventService;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.reads.ReadCallback;
 import 
org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -72,12 +75,13 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         Mutation repair2 = mutation(cell1);
 
         // check that the correct repairs are calculated
-        Map<InetAddressAndPort, Mutation> repairs = new HashMap<>();
-        repairs.put(target1, repair1);
-        repairs.put(target2, repair2);
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(replica1, repair1);
+        repairs.put(replica2, repair2);
 
 
-        DiagnosticPartitionReadRepairHandler handler = 
createRepairHandler(repairs, 2);
+        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        DiagnosticPartitionReadRepairHandler handler = 
createRepairHandler(repairs, 2, replicaLayout);
 
         Assert.assertTrue(handler.updatesByEp.isEmpty());
 
@@ -102,17 +106,20 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
     }
 
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, long queryStartNanoTime, ConsistencyLevel consistency)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
     {
-        return new DiagnosticBlockingRepairHandler(command, 
queryStartNanoTime, consistency);
+        return new DiagnosticBlockingRepairHandler(command, replicaLayout, 
queryStartNanoTime);
     }
 
-    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor)
+    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
ReplicaLayout<?, ?> replicaLayout)
     {
-        Set<InetAddressAndPort> participants = repairs.keySet();
-        InetAddressAndPort[] participantArray = new 
InetAddressAndPort[participants.size()];
-        participants.toArray(participantArray);
-        return new DiagnosticPartitionReadRepairHandler(ks, key, 
ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray);
+        return new DiagnosticPartitionReadRepairHandler(key, repairs, 
maxBlockFor, replicaLayout);
+    }
+
+    private static DiagnosticPartitionReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+    {
+        EndpointsForRange replicas = 
EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
+        return createRepairHandler(repairs, maxBlockFor, 
replicaLayout(replicas, replicas));
     }
 
     private static class DiagnosticBlockingRepairHandler extends 
BlockingReadRepair implements InstrumentedReadRepair
@@ -120,9 +127,9 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         private Set<InetAddressAndPort> recipients = Collections.emptySet();
         private ReadCallback readCallback = null;
 
-        DiagnosticBlockingRepairHandler(ReadCommand command, long 
queryStartNanoTime, ConsistencyLevel consistency)
+        DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, 
?> replicaLayout, long queryStartNanoTime)
         {
-            super(command, queryStartNanoTime, consistency);
+            super(command, replicaLayout, queryStartNanoTime);
             DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, 
this::onRepairEvent);
         }
 
@@ -130,7 +137,7 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         {
             if (e.getType() == ReadRepairEventType.START_REPAIR) recipients = 
new HashSet<>(e.destinations);
             else if (e.getType() == ReadRepairEventType.SPECULATED_READ) 
recipients.addAll(e.destinations);
-            Assert.assertEquals(targets, e.allEndpoints);
+            Assert.assertEquals(new HashSet<>(targets), new 
HashSet<>(e.allEndpoints));
             Assert.assertNotNull(e.toMap());
         }
 
@@ -156,13 +163,13 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         }
     }
 
-    private static class DiagnosticPartitionReadRepairHandler extends 
BlockingPartitionRepair
+    private static class DiagnosticPartitionReadRepairHandler<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, 
L>
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new 
HashMap<>();
 
-        DiagnosticPartitionReadRepairHandler(Keyspace keyspace, DecoratedKey 
key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, 
int maxBlockFor, InetAddressAndPort[] participants)
+        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, 
Mutation> repairs, int maxBlockFor, L replicaLayout)
         {
-            super(keyspace, key, consistency, repairs, maxBlockFor, 
participants);
+            super(key, repairs, maxBlockFor, replicaLayout);
             
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, 
this::onRepairEvent);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index 2fb8ffc..f3d2866 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.Set;
 
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.ReadCallback;
 
-public interface InstrumentedReadRepair extends ReadRepair
+public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> extends ReadRepair<E, L>
 {
     Set<InetAddressAndPort> getReadRecipients();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index efce59a..9bb7eed 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -26,20 +26,20 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.ReadCallback;
 
 public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadOnlyReadRepair extends 
ReadOnlyReadRepair implements InstrumentedReadRepair
+    private static class InstrumentedReadOnlyReadRepair<E extends 
Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair 
implements InstrumentedReadRepair
     {
-        public InstrumentedReadOnlyReadRepair(ReadCommand command, long 
queryStartNanoTime, ConsistencyLevel consistency)
+        public InstrumentedReadOnlyReadRepair(ReadCommand command, L 
replicaLayout, long queryStartNanoTime)
         {
-            super(command, queryStartNanoTime, consistency);
+            super(command, replicaLayout, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -54,12 +54,6 @@ public class ReadOnlyReadRepairTest extends 
AbstractReadRepairTest
         }
 
         @Override
-        Iterable<InetAddressAndPort> getCandidatesForToken(Token token)
-        {
-            return targets;
-        }
-
-        @Override
         public Set<InetAddressAndPort> getReadRecipients()
         {
             return readCommandRecipients;
@@ -79,22 +73,24 @@ public class ReadOnlyReadRepairTest extends 
AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, long queryStartNanoTime, ConsistencyLevel consistency)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand 
command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
     {
-        return new InstrumentedReadOnlyReadRepair(command, queryStartNanoTime, 
consistency);
+        return new InstrumentedReadOnlyReadRepair(command, replicaLayout, 
queryStartNanoTime);
     }
 
     @Test
     public void getMergeListener()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair();
-        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, 
repair.getMergeListener(new InetAddressAndPort[]{}));
+        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout);
+        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, 
repair.getMergeListener(replicaLayout));
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void repairPartitionFailure()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair();
-        repair.repairPartition(dk(1), Collections.emptyMap(), new 
InetAddressAndPort[]{});
+        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
+        InstrumentedReadRepair repair = 
createInstrumentedReadRepair(replicaLayout);
+        repair.repairPartition(null, Collections.emptyMap(), replicaLayout);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
new file mode 100644
index 0000000..e4ba25d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.repair;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+
+public class ReadRepairTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+    static TableMetadata cfm;
+    static Replica target1;
+    static Replica target2;
+    static Replica target3;
+    static EndpointsForRange targets;
+
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, 
L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    {
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, L replicaLayout)
+        {
+            super(Util.dk("not a valid key"), repairs, maxBlockFor, 
replicaLayout);
+        }
+
+        Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
+
+        protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort 
endpoint)
+        {
+            mutationsSent.put(endpoint, message.payload);
+        }
+
+        @Override
+        protected boolean isLocal(InetAddressAndPort endpoint)
+        {
+            return targets.endpoints().contains(endpoint);
+        }
+    }
+
+    static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
+    static DecoratedKey key;
+    static Cell cell1;
+    static Cell cell2;
+    static Cell cell3;
+    static Mutation resolved;
+
+    private static void assertRowsEqual(Row expected, Row actual)
+    {
+        try
+        {
+            Assert.assertEquals(expected == null, actual == null);
+            if (expected == null)
+                return;
+            Assert.assertEquals(expected.clustering(), actual.clustering());
+            Assert.assertEquals(expected.deletion(), actual.deletion());
+            Assert.assertArrayEquals(Iterables.toArray(expected.cells(), 
Cell.class), Iterables.toArray(expected.cells(), Cell.class));
+        } catch (Throwable t)
+        {
+            throw new AssertionError(String.format("Row comparison failed, 
expected %s got %s", expected, actual), t);
+        }
+    }
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        String ksName = "ks";
+
+        cfm = CreateTableStatement.parse("CREATE TABLE tbl (k int primary key, 
v text)", ksName).build();
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, 
KeyspaceParams.simple(3), Tables.of(cfm));
+        MigrationManager.announceNewKeyspace(ksm, false);
+
+        ks = Keyspace.open(ksName);
+        cfs = ks.getColumnFamilyStore("tbl");
+
+        cfs.sampleReadLatencyNanos = 0;
+
+        target1 = full(InetAddressAndPort.getByName("127.0.0.255"));
+        target2 = full(InetAddressAndPort.getByName("127.0.0.254"));
+        target3 = full(InetAddressAndPort.getByName("127.0.0.253"));
+
+        targets = EndpointsForRange.of(target1, target2, target3);
+
+        // default test values
+        key  = dk(5);
+        cell1 = cell("v", "val1", now);
+        cell2 = cell("v", "val2", now);
+        cell3 = cell("v", "val3", now);
+        resolved = mutation(cell1, cell2);
+    }
+
+    private static DecoratedKey dk(int v)
+    {
+        return 
DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v));
+    }
+
+    private static Cell cell(String name, String value, long timestamp)
+    {
+        return 
BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), 
timestamp, ByteBufferUtil.bytes(value));
+    }
+
+    private static Mutation mutation(Cell... cells)
+    {
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.EMPTY);
+        for (Cell cell: cells)
+        {
+            builder.addCell(cell);
+        }
+        return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, 
builder.build()));
+    }
+
+    private static InstrumentedReadRepairHandler 
createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, 
EndpointsForRange all, EndpointsForRange targets)
+    {
+        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, 
ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets);
+        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, 
replicaLayout);
+    }
+
+    @Test
+    public void consistencyLevelTest() throws Exception
+    {
+        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
ks));
+        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
ks));
+        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
+        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
+        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+    }
+
+    private static void assertMutationEqual(Mutation expected, Mutation actual)
+    {
+        Assert.assertEquals(expected.getKeyspaceName(), 
actual.getKeyspaceName());
+        Assert.assertEquals(expected.key(), actual.key());
+        Assert.assertEquals(expected.key(), actual.key());
+        PartitionUpdate expectedUpdate = 
Iterables.getOnlyElement(expected.getPartitionUpdates());
+        PartitionUpdate actualUpdate = 
Iterables.getOnlyElement(actual.getPartitionUpdates());
+        assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), 
Iterables.getOnlyElement(actualUpdate));
+    }
+
+    @Test
+    public void additionalMutationRequired() throws Exception
+    {
+        Mutation repair1 = mutation(cell2);
+        Mutation repair2 = mutation(cell1);
+
+        // check that the correct repairs are calculated
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, repair1);
+        repairs.put(target2, repair2);
+
+        InstrumentedReadRepairHandler<?, ?> handler = 
createRepairHandler(repairs, 2,
+                                                                    targets, 
EndpointsForRange.of(target1, target2));
+
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+
+        // check that the correct mutations are sent
+        handler.sendInitialRepairs();
+        Assert.assertEquals(2, handler.mutationsSent.size());
+        assertMutationEqual(repair1, 
handler.mutationsSent.get(target1.endpoint()));
+        assertMutationEqual(repair2, 
handler.mutationsSent.get(target2.endpoint()));
+
+        // check that a combined mutation is speculatively sent to the 3rd 
target
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        assertMutationEqual(resolved, 
handler.mutationsSent.get(target3.endpoint()));
+
+        // check repairs stop blocking after receiving 2 acks
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target1.endpoint());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target3.endpoint());
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+    }
+
+    /**
+     * If we've received enough acks, we shouldn't send any additional 
mutations
+     */
+    @Test
+    public void noAdditionalMutationRequired() throws Exception
+    {
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell2));
+        repairs.put(target2, mutation(cell1));
+
+        EndpointsForRange replicas = EndpointsForRange.of(target1, target2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicas, targets);
+        handler.sendInitialRepairs();
+        handler.ack(target1.endpoint());
+        handler.ack(target2.endpoint());
+
+        // both replicas have acked, we shouldn't send anything else out
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+    }
+
+    /**
+     * If there are no additional nodes we can send mutations to, we... 
shouldn't
+     */
+    @Test
+    public void noAdditionalMutationPossible() throws Exception
+    {
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell2));
+        repairs.put(target2, mutation(cell1));
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, EndpointsForRange.of(target1, target2),
+                                                                    
EndpointsForRange.of(target1, target2));
+        handler.sendInitialRepairs();
+
+        // we've already sent mutations to all candidates, so we shouldn't 
send any more
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+        Assert.assertTrue(handler.mutationsSent.isEmpty());
+    }
+
+    /**
+     * If we didn't send a repair to a replica because there wasn't a diff 
with the
+     * resolved column family, we shouldn't send it a speculative mutation
+     */
+    @Test
+    public void mutationsArentSentToInSyncNodes() throws Exception
+    {
+        Mutation repair1 = mutation(cell2);
+
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, repair1);
+
+        // check that the correct initial mutations are sent out
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, targets, EndpointsForRange.of(target1, target2));
+        handler.sendInitialRepairs();
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        
Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
+
+        // check that speculative mutations aren't sent to target2
+        handler.mutationsSent.clear();
+        handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS);
+
+        Assert.assertEquals(1, handler.mutationsSent.size());
+        
Assert.assertTrue(handler.mutationsSent.containsKey(target3.endpoint()));
+    }
+
+    @Test
+    public void onlyBlockOnQuorum()
+    {
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell1));
+        repairs.put(target2, mutation(cell2));
+        repairs.put(target3, mutation(cell3));
+        Assert.assertEquals(3, repairs.size());
+
+        EndpointsForRange replicas = EndpointsForRange.of(target1, target2, 
target3);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, replicas, replicas);
+        handler.sendInitialRepairs();
+
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+        handler.ack(target1.endpoint());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        // here we should stop blocking, even though we've sent 3 repairs
+        handler.ack(target2.endpoint());
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+    }
+
+    /**
+     * For dc local consistency levels, noop mutations and responses from 
remote dcs should not affect effective blockFor
+     */
+    @Test
+    public void remoteDCTest() throws Exception
+    {
+        Map<Replica, Mutation> repairs = new HashMap<>();
+        repairs.put(target1, mutation(cell1));
+
+        Replica remote1 = full(InetAddressAndPort.getByName("10.0.0.1"));
+        Replica remote2 = full(InetAddressAndPort.getByName("10.0.0.2"));
+        repairs.put(remote1, mutation(cell1));
+
+        EndpointsForRange participants = EndpointsForRange.of(target1, 
target2, remote1, remote2);
+        EndpointsForRange targets = EndpointsForRange.of(target1, target2);
+
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 
2, participants, targets);
+        handler.sendInitialRepairs();
+        Assert.assertEquals(2, handler.mutationsSent.size());
+        
Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
+        
Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
+
+        Assert.assertEquals(1, handler.waitingOn());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        handler.ack(remote1.endpoint());
+        Assert.assertEquals(1, handler.waitingOn());
+        Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+
+        handler.ack(target1.endpoint());
+        Assert.assertEquals(0, handler.waitingOn());
+        Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index f97980b..2a2dec2 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -29,17 +29,25 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public class TestableReadRepair implements ReadRepair
+public class TestableReadRepair<E extends Endpoints<E>, L extends 
ReplicaLayout<E, L>> implements ReadRepair<E, L>
 {
     public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
 
     private final ReadCommand command;
     private final ConsistencyLevel consistency;
 
+    private boolean partitionListenerClosed = false;
+    private boolean rowListenerClosed = true;
+
     public TestableReadRepair(ReadCommand command, ConsistencyLevel 
consistency)
     {
         this.command = command;
@@ -47,13 +55,35 @@ public class TestableReadRepair implements ReadRepair
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener 
getMergeListener(InetAddressAndPort[] endpoints)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(L 
endpoints)
     {
-        return new PartitionIteratorMergeListener(endpoints, command, 
consistency, this);
+        return new PartitionIteratorMergeListener(endpoints, command, 
consistency, this) {
+            @Override
+            public void close()
+            {
+                super.close();
+                partitionListenerClosed = true;
+            }
+
+            @Override
+            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+            {
+                assert rowListenerClosed;
+                rowListenerClosed = false;
+                return new RowIteratorMergeListener(partitionKey, 
columns(versions), isReversed(versions), endpoints, command, consistency, 
TestableReadRepair.this) {
+                    @Override
+                    public void close()
+                    {
+                        super.close();
+                        rowListenerClosed = true;
+                    }
+                };
+            }
+        };
     }
 
     @Override
-    public void startRepair(DigestResolver digestResolver, 
List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> 
contactedEndpoints, Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, L> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
     {
 
     }
@@ -83,13 +113,19 @@ public class TestableReadRepair implements ReadRepair
     }
 
     @Override
-    public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, 
Mutation> mutations, InetAddressAndPort[] destinations)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, L replicaLayout)
     {
-        sent.putAll(mutations);
+        for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
+            sent.put(entry.getKey().endpoint(), entry.getValue());
     }
 
     public Mutation getForEndpoint(InetAddressAndPort endpoint)
     {
         return sent.get(endpoint);
     }
-}
+
+    public boolean dataWasConsumed()
+    {
+        return partitionListenerClosed && rowListenerClosed;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index bc501be..909e221 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -144,7 +145,7 @@ public class StreamingTransferTest
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
 
         StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER)
-                                                  .requestRanges(LOCAL, 
KEYSPACE2, ranges)
+                                                  .requestRanges(LOCAL, 
KEYSPACE2, RangesAtEndpoint.toDummyList(ranges), 
RangesAtEndpoint.toDummyList(Collections.emptyList()))
                                                   .execute();
 
         UUID planId = futureResult.planId;
@@ -238,13 +239,13 @@ public class StreamingTransferTest
         List<Range<Token>> ranges = new ArrayList<>();
         // wrapped range
         ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), 
p.getToken(ByteBufferUtil.bytes("key0"))));
-        StreamPlan streamPlan = new 
StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), 
ranges, cfs.getTableName());
+        StreamPlan streamPlan = new 
StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), 
RangesAtEndpoint.toDummyList(ranges), cfs.getTableName());
         streamPlan.execute().get();
 
         //cannot add ranges after stream session is finished
         try
         {
-            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, 
cfs.getTableName());
+            streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), 
RangesAtEndpoint.toDummyList(ranges), cfs.getTableName());
             fail("Should have thrown exception");
         }
         catch (RuntimeException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 2842374..f8f6b12 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -95,7 +95,7 @@ public class AccumulatorTest
 
         assertEquals("0", accu.get(3));
 
-        Iterator<String> iter = accu.iterator();
+        Iterator<String> iter = accu.snapshot().iterator();
 
         assertEquals("3", iter.next());
         assertEquals("2", iter.next());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to