Repository: cassandra
Updated Branches:
  refs/heads/trunk 0379201c7 -> 8554d6b35


LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of 
Unavailable

patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-14735


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8554d6b3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8554d6b3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8554d6b3

Branch: refs/heads/trunk
Commit: 8554d6b35dcc5eec46ed7edc809a36c1f7fa588f
Parents: 0379201
Author: Benedict Elliott Smith <[email protected]>
Authored: Thu Sep 20 08:54:55 2018 +0100
Committer: Benedict Elliott Smith <[email protected]>
Committed: Wed Sep 26 10:55:11 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ConsistencyLevel.java   | 233 ++-----------------
 .../apache/cassandra/locator/InOurDcTester.java |  93 ++++++++
 .../apache/cassandra/locator/ReplicaPlan.java   |   3 -
 .../apache/cassandra/locator/ReplicaPlans.java  | 193 +++++++++++++--
 .../org/apache/cassandra/locator/Replicas.java  |  65 +++++-
 .../service/DatacenterWriteResponseHandler.java |   7 +-
 .../apache/cassandra/service/StorageProxy.java  |   6 +-
 .../reads/repair/BlockingPartitionRepair.java   |  27 ++-
 .../reads/repair/BlockingReadRepairTest.java    |  10 +-
 .../DiagEventsBlockingReadRepairTest.java       |  23 +-
 .../service/reads/repair/ReadRepairTest.java    |   9 +-
 12 files changed, 373 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9f7958c..9139822 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead 
of Unavailable (CASSANDRA-14735)
  * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
  * Fail incremental repair prepare phase if it encounters sstables from 
un-finalized sessions (CASSANDRA-14763)
  * Add a check for receiving digest response from transient node 
(CASSANDRA-14750)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 5a4baf7..9e884a7 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -17,26 +17,18 @@
  */
 package org.apache.cassandra.db;
 
-import java.util.HashMap;
-import java.util.Map;
 
-import com.google.common.collect.Iterables;
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaCollection;
-import org.apache.cassandra.locator.Replicas;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.transport.ProtocolException;
 
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+
 public enum ConsistencyLevel
 {
     ANY         (0),
@@ -52,8 +44,6 @@ public enum ConsistencyLevel
     LOCAL_ONE   (10, true),
     NODE_LOCAL  (11, true);
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ConsistencyLevel.class);
-
     // Used by the binary protocol
     public final int code;
     private final boolean isDCLocal;
@@ -90,18 +80,27 @@ public enum ConsistencyLevel
         return codeIdx[code];
     }
 
-    private int quorumFor(Keyspace keyspace)
+    public static int quorumFor(Keyspace keyspace)
     {
         return 
(keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1;
     }
 
-    private int localQuorumFor(Keyspace keyspace, String dc)
+    public static int localQuorumFor(Keyspace keyspace, String dc)
     {
         return (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
              ? (((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1
              : quorumFor(keyspace);
     }
 
+    public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace)
+    {
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
+        ObjectIntOpenHashMap<String> perDc = new 
ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
+        for (String dc : strategy.getDatacenters())
+            perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
+        return perDc;
+    }
+
     public int blockFor(Keyspace keyspace)
     {
         switch (this)
@@ -152,7 +151,7 @@ public enum ConsistencyLevel
                 break;
             case LOCAL_ONE: case LOCAL_QUORUM: case LOCAL_SERIAL:
                 // we will only count local replicas towards our response 
count, as these queries only care about local guarantees
-                blockFor += countDCLocalReplicas(pending).allReplicas();
+                blockFor += countInOurDc(pending).allReplicas();
                 break;
             case ONE: case TWO: case THREE:
             case QUORUM: case EACH_QUORUM:
@@ -176,208 +175,6 @@ public enum ConsistencyLevel
         return isDCLocal;
     }
 
-    public static boolean isLocal(InetAddressAndPort endpoint)
-    {
-        return 
DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
-    }
-
-    public static boolean isLocal(Replica replica)
-    {
-        return isLocal(replica.endpoint());
-    }
-
-    private static ReplicaCount countDCLocalReplicas(ReplicaCollection<?> 
liveReplicas)
-    {
-        ReplicaCount count = new ReplicaCount();
-        for (Replica replica : liveReplicas)
-            if (isLocal(replica))
-                count.increment(replica);
-        return count;
-    }
-
-    private static class ReplicaCount
-    {
-        int fullReplicas;
-        int transientReplicas;
-
-        int allReplicas()
-        {
-            return fullReplicas + transientReplicas;
-        }
-
-        void increment(Replica replica)
-        {
-            if (replica.isFull()) ++fullReplicas;
-            else ++transientReplicas;
-        }
-
-        boolean isSufficient(int allReplicas, int fullReplicas)
-        {
-            return this.fullReplicas >= fullReplicas
-                    && this.allReplicas() >= allReplicas;
-        }
-    }
-
-    private static Map<String, ReplicaCount> countPerDCEndpoints(Keyspace 
keyspace, Iterable<Replica> liveReplicas)
-    {
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
-
-        Map<String, ReplicaCount> dcEndpoints = new HashMap<>();
-        for (String dc: strategy.getDatacenters())
-            dcEndpoints.put(dc, new ReplicaCount());
-
-        for (Replica replica : liveReplicas)
-        {
-            String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
-            dcEndpoints.get(dc).increment(replica);
-        }
-        return dcEndpoints;
-    }
-
-    public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E 
liveReplicas)
-    {
-        return filterForQuery(keyspace, liveReplicas, false);
-    }
-
-    public <E extends Endpoints<E>> E filterForQuery(Keyspace keyspace, E 
liveReplicas, boolean alwaysSpeculate)
-    {
-        /*
-         * If we are doing an each quorum query, we have to make sure that the 
endpoints we select
-         * provide a quorum for each data center. If we are not using a 
NetworkTopologyStrategy,
-         * we should fall through and grab a quorum in the replication 
strategy.
-         *
-         * We do not speculate for EACH_QUORUM.
-         */
-        if (this == EACH_QUORUM && keyspace.getReplicationStrategy() 
instanceof NetworkTopologyStrategy)
-            return filterForEachQuorum(keyspace, liveReplicas);
-
-        int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
-        return isDCLocal
-                ? liveReplicas.filter(ConsistencyLevel::isLocal, count)
-                : liveReplicas.subList(0, Math.min(liveReplicas.size(), 
count));
-    }
-
-    private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, 
E liveReplicas)
-    {
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
keyspace.getReplicationStrategy();
-        Map<String, Integer> dcsReplicas = new HashMap<>();
-        for (String dc : strategy.getDatacenters())
-        {
-            // we put _up to_ dc replicas only
-            dcsReplicas.put(dc, localQuorumFor(keyspace, dc));
-        }
-
-        return liveReplicas.filter((replica) -> {
-            String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
-            int replicas = dcsReplicas.get(dc);
-            if (replicas > 0)
-            {
-                dcsReplicas.put(dc, --replicas);
-                return true;
-            }
-            return false;
-        });
-    }
-
-    public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas)
-    {
-        switch (this)
-        {
-            case ANY:
-                // local hint is acceptable, and local node is always live
-                return true;
-            case LOCAL_ONE:
-                return countDCLocalReplicas(liveReplicas).isSufficient(1, 1);
-            case LOCAL_QUORUM:
-                return 
countDCLocalReplicas(liveReplicas).isSufficient(blockFor(keyspace), 1);
-            case EACH_QUORUM:
-                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
-                {
-                    int fullCount = 0;
-                    for (Map.Entry<String, ReplicaCount> entry : 
countPerDCEndpoints(keyspace, liveReplicas).entrySet())
-                    {
-                        ReplicaCount count = entry.getValue();
-                        if (!count.isSufficient(localQuorumFor(keyspace, 
entry.getKey()), 0))
-                            return false;
-                        fullCount += count.fullReplicas;
-                    }
-                    return fullCount > 0;
-                }
-                // Fallthough on purpose for SimpleStrategy
-            default:
-                return liveReplicas.size() >= blockFor(keyspace)
-                        && Replicas.countFull(liveReplicas) > 0;
-        }
-    }
-
-    public void assureSufficientLiveReplicasForRead(Keyspace keyspace, 
Endpoints<?> liveReplicas) throws UnavailableException
-    {
-        assureSufficientLiveReplicas(keyspace, liveReplicas, 
blockFor(keyspace), 1);
-    }
-    public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, 
Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
-    {
-        assureSufficientLiveReplicas(keyspace, allLive, 
blockForWrite(keyspace, pendingWithDown), 0);
-    }
-    void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, 
int blockFor, int blockForFullReplicas) throws UnavailableException
-    {
-        switch (this)
-        {
-            case ANY:
-                // local hint is acceptable, and local node is always live
-                break;
-            case LOCAL_ONE:
-            {
-                ReplicaCount localLive = countDCLocalReplicas(allLive);
-                if (!localLive.isSufficient(blockFor, blockForFullReplicas))
-                    throw UnavailableException.create(this, 1, 
blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
-                break;
-            }
-            case LOCAL_QUORUM:
-            {
-                ReplicaCount localLive = countDCLocalReplicas(allLive);
-                if (!localLive.isSufficient(blockFor, blockForFullReplicas))
-                {
-                    if (logger.isTraceEnabled())
-                    {
-                        logger.trace(String.format("Local replicas %s are 
insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d 
full replicas in '%s'",
-                                allLive.filter(ConsistencyLevel::isLocal), 
blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
-                    }
-                    throw UnavailableException.create(this, blockFor, 
blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas);
-                }
-                break;
-            }
-            case EACH_QUORUM:
-                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
-                {
-                    int total = 0;
-                    int totalFull = 0;
-                    for (Map.Entry<String, ReplicaCount> entry : 
countPerDCEndpoints(keyspace, allLive).entrySet())
-                    {
-                        int dcBlockFor = localQuorumFor(keyspace, 
entry.getKey());
-                        ReplicaCount dcCount = entry.getValue();
-                        if (!dcCount.isSufficient(dcBlockFor, 0))
-                            throw UnavailableException.create(this, 
entry.getKey(), dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas);
-                        totalFull += dcCount.fullReplicas;
-                        total += dcCount.allReplicas();
-                    }
-                    if (totalFull < blockForFullReplicas)
-                        throw UnavailableException.create(this, blockFor, 
total, blockForFullReplicas, totalFull);
-                    break;
-                }
-                // Fallthough on purpose for SimpleStrategy
-            default:
-                int live = allLive.size();
-                int full = Replicas.countFull(allLive);
-                if (live < blockFor || full < blockForFullReplicas)
-                {
-                    if (logger.isTraceEnabled())
-                        logger.trace("Live nodes {} do not satisfy 
ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
-                    throw UnavailableException.create(this, blockFor, 
blockForFullReplicas, live, full);
-                }
-                break;
-        }
-    }
-
     public void validateForRead(String keyspaceName) throws 
InvalidRequestException
     {
         switch (this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/InOurDcTester.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/InOurDcTester.java 
b/src/java/org/apache/cassandra/locator/InOurDcTester.java
new file mode 100644
index 0000000..23a8c13
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/InOurDcTester.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import java.util.function.Predicate;
+
+public class InOurDcTester
+{
+    private static ReplicaTester replicas;
+    private static EndpointTester endpoints;
+
+    final String dc;
+    final IEndpointSnitch snitch;
+
+    private InOurDcTester(String dc, IEndpointSnitch snitch)
+    {
+        this.dc = dc;
+        this.snitch = snitch;
+    }
+
+    boolean stale()
+    {
+        return dc != DatabaseDescriptor.getLocalDataCenter()
+                || snitch != DatabaseDescriptor.getEndpointSnitch()
+                // this final clause checks if somehow the snitch/localDc have 
got out of whack;
+                // presently, this is possible but very unlikely, but this 
check will also help
+                // resolve races on these global fields as well
+                || 
!dc.equals(snitch.getDatacenter(FBUtilities.getBroadcastAddressAndPort()));
+    }
+
+    private static final class ReplicaTester extends InOurDcTester implements 
Predicate<Replica>
+    {
+        private ReplicaTester(String dc, IEndpointSnitch snitch)
+        {
+            super(dc, snitch);
+        }
+
+        @Override
+        public boolean test(Replica replica)
+        {
+            return dc.equals(snitch.getDatacenter(replica.endpoint()));
+        }
+    }
+
+    private static final class EndpointTester extends InOurDcTester implements 
Predicate<InetAddressAndPort>
+    {
+        private EndpointTester(String dc, IEndpointSnitch snitch)
+        {
+            super(dc, snitch);
+        }
+
+        @Override
+        public boolean test(InetAddressAndPort endpoint)
+        {
+            return dc.equals(snitch.getDatacenter(endpoint));
+        }
+    }
+
+    public static Predicate<Replica> replicas()
+    {
+        ReplicaTester cur = replicas;
+        if (cur == null || cur.stale())
+            replicas = cur = new 
ReplicaTester(DatabaseDescriptor.getLocalDataCenter(), 
DatabaseDescriptor.getEndpointSnitch());
+        return cur;
+    }
+
+    public static Predicate<InetAddressAndPort> endpoints()
+    {
+        EndpointTester cur = endpoints;
+        if (cur == null || cur.stale())
+            endpoints = cur = new 
EndpointTester(DatabaseDescriptor.getLocalDataCenter(), 
DatabaseDescriptor.getEndpointSnitch());
+        return cur;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/ReplicaPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 4d6127b..861c912 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -50,7 +50,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
     }
 
     public abstract int blockFor();
-    public abstract void assureSufficientReplicas();
 
     public E contacts() { return contacts; }
     public boolean contacts(Replica replica) { return 
contacts.contains(replica); }
@@ -70,7 +69,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         }
 
         public int blockFor() { return consistencyLevel.blockFor(keyspace); }
-        public void assureSufficientReplicas() { 
consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); }
 
         public E candidates() { return candidates; }
 
@@ -142,7 +140,6 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         }
 
         public int blockFor() { return 
consistencyLevel.blockForWrite(keyspace, pending()); }
-        public void assureSufficientReplicas() { 
consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), 
pending()); }
 
         /** Replicas that a region of the ring is moving to; not yet ready to 
serve reads, but should receive writes */
         public E pending() { return pending; }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 25f42c3..3d56a73 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -18,7 +18,10 @@
 
 package org.apache.cassandra.locator;
 
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
@@ -30,7 +33,8 @@ import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
-import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.function.Predicate;
@@ -38,9 +42,117 @@ import java.util.function.Predicate;
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.filter;
 import static com.google.common.collect.Iterables.limit;
+import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor;
+import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
+import static org.apache.cassandra.locator.Replicas.countInOurDc;
+import static org.apache.cassandra.locator.Replicas.countPerDc;
 
 public class ReplicaPlans
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(ReplicaPlans.class);
+
+    public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
+    {
+        switch (consistencyLevel)
+        {
+            case ANY:
+                // local hint is acceptable, and local node is always live
+                return true;
+            case LOCAL_ONE:
+                return countInOurDc(liveReplicas).hasAtleast(1, 1);
+            case LOCAL_QUORUM:
+                return 
countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace), 1);
+            case EACH_QUORUM:
+                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+                {
+                    int fullCount = 0;
+                    Collection<String> dcs = ((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getDatacenters();
+                    for (ObjectObjectCursor<String, Replicas.ReplicaCount> 
entry : countPerDc(dcs, liveReplicas))
+                    {
+                        Replicas.ReplicaCount count = entry.value;
+                        if (!count.hasAtleast(localQuorumFor(keyspace, 
entry.key), 0))
+                            return false;
+                        fullCount += count.fullReplicas();
+                    }
+                    return fullCount > 0;
+                }
+                // Fallthough on purpose for SimpleStrategy
+            default:
+                return liveReplicas.size() >= 
consistencyLevel.blockFor(keyspace)
+                        && Replicas.countFull(liveReplicas) > 0;
+        }
+    }
+
+    static void assureSufficientLiveReplicasForRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws 
UnavailableException
+    {
+        assureSufficientLiveReplicas(keyspace, consistencyLevel, liveReplicas, 
consistencyLevel.blockFor(keyspace), 1);
+    }
+    static void assureSufficientLiveReplicasForWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> 
pendingWithDown) throws UnavailableException
+    {
+        assureSufficientLiveReplicas(keyspace, consistencyLevel, allLive, 
consistencyLevel.blockForWrite(keyspace, pendingWithDown), 0);
+    }
+    static void assureSufficientLiveReplicas(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int 
blockForFullReplicas) throws UnavailableException
+    {
+        switch (consistencyLevel)
+        {
+            case ANY:
+                // local hint is acceptable, and local node is always live
+                break;
+            case LOCAL_ONE:
+            {
+                Replicas.ReplicaCount localLive = countInOurDc(allLive);
+                if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
+                    throw UnavailableException.create(consistencyLevel, 1, 
blockForFullReplicas, localLive.allReplicas(), localLive.fullReplicas());
+                break;
+            }
+            case LOCAL_QUORUM:
+            {
+                Replicas.ReplicaCount localLive = countInOurDc(allLive);
+                if (!localLive.hasAtleast(blockFor, blockForFullReplicas))
+                {
+                    if (logger.isTraceEnabled())
+                    {
+                        logger.trace(String.format("Local replicas %s are 
insufficient to satisfy LOCAL_QUORUM requirement of %d live replicas and %d 
full replicas in '%s'",
+                                allLive.filter(InOurDcTester.replicas()), 
blockFor, blockForFullReplicas, DatabaseDescriptor.getLocalDataCenter()));
+                    }
+                    throw UnavailableException.create(consistencyLevel, 
blockFor, blockForFullReplicas, localLive.allReplicas(), 
localLive.fullReplicas());
+                }
+                break;
+            }
+            case EACH_QUORUM:
+                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+                {
+                    int total = 0;
+                    int totalFull = 0;
+                    Collection<String> dcs = ((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getDatacenters();
+                    for (ObjectObjectCursor<String, Replicas.ReplicaCount> 
entry : countPerDc(dcs, allLive))
+                    {
+                        int dcBlockFor = 
ConsistencyLevel.localQuorumFor(keyspace, entry.key);
+                        Replicas.ReplicaCount dcCount = entry.value;
+                        if (!dcCount.hasAtleast(dcBlockFor, 0))
+                            throw 
UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, 
dcCount.allReplicas(), 0, dcCount.fullReplicas());
+                        totalFull += dcCount.fullReplicas();
+                        total += dcCount.allReplicas();
+                    }
+                    if (totalFull < blockForFullReplicas)
+                        throw UnavailableException.create(consistencyLevel, 
blockFor, total, blockForFullReplicas, totalFull);
+                    break;
+                }
+                // Fallthough on purpose for SimpleStrategy
+            default:
+                int live = allLive.size();
+                int full = Replicas.countFull(allLive);
+                if (live < blockFor || full < blockForFullReplicas)
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace("Live nodes {} do not satisfy 
ConsistencyLevel ({} required)", Iterables.toString(allLive), blockFor);
+                    throw UnavailableException.create(consistencyLevel, 
blockFor, blockForFullReplicas, live, full);
+                }
+                break;
+        }
+    }
+
 
     /**
      * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. 
This node is *assumed* to be alive.
@@ -109,9 +221,8 @@ public class ReplicaPlans
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
     {
         EndpointsForToken contacts = selector.select(keyspace, 
consistencyLevel, liveAndDown, live);
-        ReplicaPlan.ForTokenWrite result = new 
ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), 
liveAndDown.all(), live.all(), contacts);
-        result.assureSufficientReplicas();
-        return result;
+        assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, 
live.all(), liveAndDown.pending());
+        return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, 
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
     }
 
     public interface Selector
@@ -154,7 +265,7 @@ public class ReplicaPlans
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
 
-            assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+            assert consistencyLevel != EACH_QUORUM;
 
             ReplicaCollection.Mutable<E> contacts = 
liveAndDown.all().newMutable(liveAndDown.all().size());
             contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
@@ -186,11 +297,7 @@ public class ReplicaPlans
         {
             // TODO: we should cleanup our semantics here, as we're filtering 
ALL nodes to localDC which is unexpected for ReplicaPlan
             // Restrict natural and pending to node in the local DC only
-            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            Predicate<Replica> isLocalDc = replica -> 
localDc.equals(snitch.getDatacenter(replica));
-
-            liveAndDown = liveAndDown.filter(isLocalDc);
+            liveAndDown = liveAndDown.filter(InOurDcTester.replicas());
         }
 
         ReplicaLayout.ForTokenWrite live = 
liveAndDown.filter(FailureDetector.isReplicaAlive);
@@ -215,6 +322,45 @@ public class ReplicaPlans
         return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, 
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, 
requiredParticipants);
     }
 
+
+    private static <E extends Endpoints<E>> E 
candidatesForRead(ConsistencyLevel consistencyLevel, E liveNaturalReplicas)
+    {
+        return consistencyLevel.isDatacenterLocal()
+                ? liveNaturalReplicas.filter(InOurDcTester.replicas())
+                : liveNaturalReplicas;
+    }
+
+    private static <E extends Endpoints<E>> E 
contactForEachQuorumRead(Keyspace keyspace, E candidates)
+    {
+        assert keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy;
+        ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace);
+
+        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        return candidates.filter(replica -> {
+            String dc = snitch.getDatacenter(replica);
+            return perDc.addTo(dc, -1) >= 0;
+        });
+    }
+
+    private static <E extends Endpoints<E>> E contactForRead(Keyspace 
keyspace, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E 
candidates)
+    {
+        /*
+         * If we are doing an each quorum query, we have to make sure that the 
endpoints we select
+         * provide a quorum for each data center. If we are not using a 
NetworkTopologyStrategy,
+         * we should fall through and grab a quorum in the replication 
strategy.
+         *
+         * We do not speculate for EACH_QUORUM.
+         *
+         * TODO: this is still very inconistently managed between 
{LOCAL,EACH}_QUORUM and other consistency levels - should address this in a 
follow-up
+         */
+        if (consistencyLevel == EACH_QUORUM && 
keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+            return contactForEachQuorumRead(keyspace, candidates);
+
+        int count = consistencyLevel.blockFor(keyspace) + (alwaysSpeculate ? 1 
: 0);
+        return candidates.subList(0, Math.min(count, candidates.size()));
+    }
+
+
     /**
      * Construct a plan for reading from a single node - this permits no 
speculation or read-repair
      */
@@ -239,18 +385,16 @@ public class ReplicaPlans
      *   - candidates who are: alive, replicate the token, and are sorted by 
their snitch scores
      *   - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) 
candidates
      *
-     * The candidate collection can be used for speculation, although at 
present it would break
-     * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+     * The candidate collection can be used for speculation, although at 
present
+     * it would break EACH_QUORUM to do so without further filtering
      */
     public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token 
token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
     {
-        ReplicaLayout.ForTokenRead candidates = 
ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
-        EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, 
candidates.natural(),
-                retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+        EndpointsForToken candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forTokenReadLiveSorted(keyspace, token).natural());
+        EndpointsForToken contacts = contactForRead(keyspace, 
consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), 
candidates);
 
-        ReplicaPlan.ForTokenRead result = new 
ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), 
contacts);
-        result.assureSufficientReplicas(); // Throw UAE early if we don't have 
enough replicas.
-        return result;
+        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
contacts);
+        return new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, 
candidates, contacts);
     }
 
     /**
@@ -262,12 +406,11 @@ public class ReplicaPlans
      */
     public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
     {
-        ReplicaLayout.ForRangeRead candidates = 
ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
-        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, 
candidates.natural());
+        EndpointsForRange candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forRangeReadLiveSorted(keyspace, range).natural());
+        EndpointsForRange contacts = contactForRead(keyspace, 
consistencyLevel, false, candidates);
 
-        ReplicaPlan.ForRangeRead result = new 
ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, 
candidates.natural(), contacts);
-        result.assureSufficientReplicas();
-        return result;
+        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
contacts);
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, 
candidates, contacts);
     }
 
     /**
@@ -280,10 +423,10 @@ public class ReplicaPlans
         EndpointsForRange mergedCandidates = 
left.candidates().keep(right.candidates().endpoints());
 
         // Check if there are enough shared endpoints for the merge to be 
possible.
-        if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, 
mergedCandidates))
+        if (!isSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
mergedCandidates))
             return null;
 
-        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, 
mergedCandidates);
+        EndpointsForRange contacts = contactForRead(keyspace, 
consistencyLevel, false, mergedCandidates);
 
         // Estimate whether merging will be a win or not
         if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, 
left.contacts(), right.contacts()))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/locator/Replicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replicas.java 
b/src/java/org/apache/cassandra/locator/Replicas.java
index 299e6ec..6c80134 100644
--- a/src/java/org/apache/cassandra/locator/Replicas.java
+++ b/src/java/org/apache/cassandra/locator/Replicas.java
@@ -19,24 +19,85 @@
 package org.apache.cassandra.locator;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.function.Predicate;
 
+import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 import static com.google.common.collect.Iterables.all;
 
 public class Replicas
 {
 
-    public static int countFull(ReplicaCollection<?> liveReplicas)
+    public static int countFull(ReplicaCollection<?> replicas)
     {
         int count = 0;
-        for (Replica replica : liveReplicas)
+        for (Replica replica : replicas)
             if (replica.isFull())
                 ++count;
         return count;
     }
 
+    public static class ReplicaCount
+    {
+        int fullReplicas;
+        int transientReplicas;
+
+        public int allReplicas()
+        {
+            return fullReplicas + transientReplicas;
+        }
+
+        public int fullReplicas()
+        {
+            return fullReplicas;
+        }
+
+        public int transientReplicas()
+        {
+            return transientReplicas;
+        }
+
+        public void increment(Replica replica)
+        {
+            if (replica.isFull()) ++fullReplicas;
+            else ++transientReplicas;
+        }
+
+        public boolean hasAtleast(int allReplicas, int fullReplicas)
+        {
+            return this.fullReplicas >= fullReplicas
+                    && this.allReplicas() >= allReplicas;
+        }
+    }
+
+    public static ReplicaCount countInOurDc(ReplicaCollection<?> replicas)
+    {
+        ReplicaCount count = new ReplicaCount();
+        Predicate<Replica> inOurDc = InOurDcTester.replicas();
+        for (Replica replica : replicas)
+            if (inOurDc.test(replica))
+                count.increment(replica);
+        return count;
+    }
+
+    public static ObjectObjectOpenHashMap<String, ReplicaCount> 
countPerDc(Collection<String> dataCenters, Iterable<Replica> liveReplicas)
+    {
+        ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new 
ObjectObjectOpenHashMap<>(dataCenters.size());
+        for (String dc: dataCenters)
+            perDc.put(dc, new ReplicaCount());
+
+        for (Replica replica : liveReplicas)
+        {
+            String dc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            perDc.get(dc).increment(replica);
+        }
+        return perDc;
+    }
+
     /**
      * A placeholder for areas of the code that cannot yet handle transient 
replicas, but should do so in future
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index a3ef76f..f30b452 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -18,15 +18,20 @@
 package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.WriteType;
+import org.apache.cassandra.locator.InOurDcTester;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 
+import java.util.function.Predicate;
+
 /**
  * This class blocks for a quorum of responses _in the local datacenter only_ 
(CL.LOCAL_QUORUM).
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
+    private final Predicate<InetAddressAndPort> waitingFor = 
InOurDcTester.endpoints();
+
     public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite 
replicaPlan,
                                           Runnable callback,
                                           WriteType writeType,
@@ -54,6 +59,6 @@ public class DatacenterWriteResponseHandler<T> extends 
WriteResponseHandler<T>
     @Override
     protected boolean waitingFor(InetAddressAndPort from)
     {
-        return replicaPlan.consistencyLevel().isLocal(from);
+        return waitingFor.test(from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0d52afa..c6315ff 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1435,8 +1435,8 @@ public class StorageProxy implements StorageProxyMBean
             Keyspace keyspace = Keyspace.open(keyspaceName);
             Token tk = cm.key().getToken();
 
-            ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, 
ReplicaPlans.writeAll)
-                    .assureSufficientReplicas();
+            // we build this ONLY to perform the sufficiency check that 
happens on construction
+            ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, 
ReplicaPlans.writeAll);
 
             // Forward the actual update to the chosen leader replica
             AbstractWriteResponseHandler<IMutation> responseHandler = new 
WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, 
replica),
@@ -2088,8 +2088,6 @@ public class StorageProxy implements StorageProxyMBean
             ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
                     = new ReadCallback<>(resolver, rangeCommand, 
sharedReplicaPlan, queryStartNanoTime);
 
-            replicaPlan.assureSufficientReplicas();
-
             // If enabled, request repaired data tracking info from full 
replicas but
             // only if there are multiple full replicas to compare results from
             if 
(DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index f536ea8..624c78f 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -23,9 +23,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
@@ -41,6 +43,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.Replicas;
+import org.apache.cassandra.locator.InOurDcTester;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
@@ -56,14 +59,21 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, P extends ReplicaPl
     private final P replicaPlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
+    private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
     public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, int maxBlockFor, P replicaPlan)
     {
+        this(key, repairs, maxBlockFor, replicaPlan,
+                replicaPlan.consistencyLevel().isDatacenterLocal() ? 
InOurDcTester.endpoints() : Predicates.alwaysTrue());
+    }
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> 
repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> 
shouldBlockOn)
+    {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
         this.replicaPlan = replicaPlan;
+        this.shouldBlockOn = shouldBlockOn;
 
         // here we remove empty repair mutations from the block for total, 
since
         // we're not sending them mutations
@@ -72,7 +82,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, 
P extends ReplicaPl
         {
             // remote dcs can sometimes get involved in dc-local reads. We 
want to repair
             // them if they do, but they shouldn't interfere with blocking the 
client read.
-            if (!repairs.containsKey(participant) && 
shouldBlockOn(participant.endpoint()))
+            if (!repairs.containsKey(participant) && 
shouldBlockOn.test(participant.endpoint()))
                 blockFor--;
         }
 
@@ -91,20 +101,9 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, P extends ReplicaPl
     }
 
     @VisibleForTesting
-    boolean isLocal(InetAddressAndPort endpoint)
-    {
-        return ConsistencyLevel.isLocal(endpoint);
-    }
-
-    private boolean shouldBlockOn(InetAddressAndPort endpoint)
-    {
-        return !replicaPlan.consistencyLevel().isDatacenterLocal() || 
isLocal(endpoint);
-    }
-
-    @VisibleForTesting
     void ack(InetAddressAndPort from)
     {
-        if (shouldBlockOn(from))
+        if (shouldBlockOn.test(from))
         {
             pendingRepairs.remove(replicaPlan.getReplicaFor(from));
             latch.countDown();
@@ -161,7 +160,7 @@ public class BlockingPartitionRepair<E extends 
Endpoints<E>, P extends ReplicaPl
             sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR), 
destination.endpoint());
             ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
 
-            if (!shouldBlockOn(destination.endpoint()))
+            if (!shouldBlockOn.test(destination.endpoint()))
                 pendingRepairs.remove(destination);
             ReadRepairDiagnostics.sendInitialRepair(this, 
destination.endpoint(), mutation);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 6bb1b7a..34bbf32 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -38,7 +38,6 @@ import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.reads.ReadCallback;
@@ -50,7 +49,8 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     {
         public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a real usable value"), repairs, maxBlockFor, 
replicaPlan);
+            super(Util.dk("not a real usable value"), repairs, maxBlockFor, 
replicaPlan,
+                    e -> targets.contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -59,12 +59,6 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
         {
             mutationsSent.put(endpoint, message.payload);
         }
-
-        @Override
-        protected boolean isLocal(InetAddressAndPort endpoint)
-        {
-            return targets.contains(endpoint);
-        }
     }
 
     @BeforeClass

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c64a73b..2471ffd 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import com.google.common.collect.Lists;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -43,7 +44,6 @@ import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.reads.ReadCallback;
 import 
org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -169,9 +169,15 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new 
HashMap<>();
 
+        private static Predicate<InetAddressAndPort> isLocal()
+        {
+            List<InetAddressAndPort> candidates = targets;
+            return e -> candidates.contains(e);
+        }
+
         DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, 
Mutation> repairs, int maxBlockFor, P replicaPlan)
         {
-            super(key, repairs, maxBlockFor, replicaPlan);
+            super(key, repairs, maxBlockFor, replicaPlan, isLocal());
             
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, 
this::onRepairEvent);
         }
 
@@ -184,18 +190,5 @@ public class DiagEventsBlockingReadRepairTest extends 
AbstractReadRepairTest
         protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort 
endpoint)
         {
         }
-
-        List<InetAddressAndPort> candidates = targets;
-
-        protected List<InetAddressAndPort> getCandidateEndpoints()
-        {
-            return candidates;
-        }
-
-        @Override
-        protected boolean isLocal(InetAddressAndPort endpoint)
-        {
-            return targets.contains(endpoint);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8554d6b3/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index b678b4d..c3f05c0 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -76,7 +76,8 @@ public class ReadRepairTest
     {
         public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, 
int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a valid key"), repairs, maxBlockFor, 
replicaPlan);
+            super(Util.dk("not a valid key"), repairs, maxBlockFor, 
replicaPlan,
+                    e -> replicaPlan.consistencyLevel().isDatacenterLocal() && 
targets.endpoints().contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -85,12 +86,6 @@ public class ReadRepairTest
         {
             mutationsSent.put(endpoint, message.payload);
         }
-
-        @Override
-        protected boolean isLocal(InetAddressAndPort endpoint)
-        {
-            return targets.endpoints().contains(endpoint);
-        }
     }
 
     static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to