This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b915688  Fix false unavailable for queries due to cluster topology 
changes
b915688 is described below

commit b915688ea878aaa284f5cedeb799c5f797c4d824
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Apr 14 13:05:55 2021 -0700

    Fix false unavailable for queries due to cluster topology changes
    
    patch by Yifan Cai; reviewed by Aleksey Yeschenko, Andres de la Peña for 
CASSANDRA-16545
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/batchlog/BatchlogManager.java |   4 +-
 .../cql3/statements/ModificationStatement.java     |   4 +-
 .../cassandra/cql3/statements/SelectStatement.java |   2 +-
 .../org/apache/cassandra/db/ConsistencyLevel.java  |  71 +++--
 .../org/apache/cassandra/db/CounterMutation.java   |   6 +-
 .../org/apache/cassandra/db/view/ViewUtils.java    |   4 +-
 .../apache/cassandra/locator/ReplicaLayout.java    |  56 ++--
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  43 +--
 .../org/apache/cassandra/locator/ReplicaPlans.java | 112 ++++----
 .../apache/cassandra/locator/TokenMetadata.java    |   3 +-
 .../DatacenterSyncWriteResponseHandler.java        |   6 +-
 .../org/apache/cassandra/service/StorageProxy.java |  51 ++--
 .../service/reads/ReplicaFilteringProtection.java  |   6 +-
 .../service/reads/repair/AbstractReadRepair.java   |   2 +-
 .../service/reads/repair/BlockingReadRepairs.java  |   2 +-
 .../apache/cassandra/db/view/ViewUtilsTest.java    |   6 +-
 .../locator/AssureSufficientLiveNodesTest.java     | 303 +++++++++++++++++++++
 .../service/WriteResponseHandlerTransientTest.java |   8 +-
 .../cassandra/service/reads/DataResolverTest.java  |   4 +-
 .../service/reads/DigestResolverTest.java          |   2 +-
 .../cassandra/service/reads/ReadExecutorTest.java  |   2 +-
 .../reads/repair/AbstractReadRepairTest.java       |  12 +-
 .../reads/repair/BlockingReadRepairTest.java       |  12 +-
 .../service/reads/repair/ReadRepairTest.java       |  12 +-
 25 files changed, 537 insertions(+), 197 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8551edf..efd6c60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-rc1
+ * Fix false unavailable for queries due to cluster topology changes 
(CASSANDRA-16545)
  * Fixed a race condition issue in nodetool repair where we poll for the error 
before seeing the error notification, leading to a less meaningful message 
(CASSANDRA-16585)
  * Fix mixed cluster GROUP BY queries (CASSANDRA-16582)
  * Upgrade jflex to 1.8.2 (CASSANDRA-16576)
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 9a009dc..65ed71e 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -489,8 +489,8 @@ public class BatchlogManager implements BatchlogManagerMBean
                         Hint.create(mutation, writtenAt));
             }
 
-            ReplicaPlan.ForTokenWrite replicaPlan = new 
ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
-                    liveRemoteOnly.pending(), liveRemoteOnly.all(), 
liveRemoteOnly.all(), liveRemoteOnly.all());
+            ReplicaPlan.ForTokenWrite replicaPlan = new 
ReplicaPlan.ForTokenWrite(keyspace, liveAndDown.replicationStrategy(),
+                    ConsistencyLevel.ONE, liveRemoteOnly.pending(), 
liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
             ReplayWriteResponseHandler<Mutation> handler = new 
ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
             Message<Mutation> message = Message.outWithFlag(MUTATION_REQ, 
mutation, MessageFlag.CALL_BACK_ON_FAILURE);
             for (Replica replica : liveRemoteOnly.all())
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0ba105c..785e6bd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -383,7 +383,7 @@ public abstract class ModificationStatement implements 
CQLStatement
 
         try
         {
-            cl.validateForRead(keyspace());
+            cl.validateForRead();
         }
         catch (InvalidRequestException e)
         {
@@ -463,7 +463,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         if (isCounter())
             cl.validateCounterForWrite(metadata());
         else
-            cl.validateForWrite(metadata.keyspace);
+            cl.validateForWrite();
 
         List<? extends IMutation> mutations =
             getMutations(options,
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 8e0df45..63f33b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -232,7 +232,7 @@ public class SelectStatement implements CQLStatement
         ConsistencyLevel cl = options.getConsistency();
         checkNotNull(cl, "Invalid empty consistency level");
 
-        cl.validateForRead(keyspace());
+        cl.validateForRead();
 
         int nowInSec = options.getNowInSeconds(state);
         int userLimit = getLimit(options);
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index e3da5b3..fbaf3fd 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -81,50 +81,49 @@ public enum ConsistencyLevel
         return codeIdx[code];
     }
 
-    public static int quorumFor(Keyspace keyspace)
+    public static int quorumFor(AbstractReplicationStrategy 
replicationStrategy)
     {
-        return 
(keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1;
+        return (replicationStrategy.getReplicationFactor().allReplicas / 2) + 
1;
     }
 
-    public static int localQuorumFor(Keyspace keyspace, String dc)
+    public static int localQuorumFor(AbstractReplicationStrategy 
replicationStrategy, String dc)
     {
-        return (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
-             ? (((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1
-             : quorumFor(keyspace);
+        return (replicationStrategy instanceof NetworkTopologyStrategy)
+             ? (((NetworkTopologyStrategy) 
replicationStrategy).getReplicationFactor(dc).allReplicas / 2) + 1
+             : quorumFor(replicationStrategy);
     }
 
-    public static int localQuorumForOurDc(Keyspace keyspace)
+    public static int localQuorumForOurDc(AbstractReplicationStrategy 
replicationStrategy)
     {
-        return localQuorumFor(keyspace, 
DatabaseDescriptor.getLocalDataCenter());
+        return localQuorumFor(replicationStrategy, 
DatabaseDescriptor.getLocalDataCenter());
     }
 
-    public static ObjectIntHashMap<String> eachQuorumForRead(Keyspace keyspace)
+    public static ObjectIntHashMap<String> 
eachQuorumForRead(AbstractReplicationStrategy replicationStrategy)
     {
-        AbstractReplicationStrategy strategy = 
keyspace.getReplicationStrategy();
-        if (strategy instanceof NetworkTopologyStrategy)
+        if (replicationStrategy instanceof NetworkTopologyStrategy)
         {
-            NetworkTopologyStrategy npStrategy = (NetworkTopologyStrategy) 
strategy;
+            NetworkTopologyStrategy npStrategy = (NetworkTopologyStrategy) 
replicationStrategy;
             ObjectIntHashMap<String> perDc = new 
ObjectIntHashMap<>(((npStrategy.getDatacenters().size() + 1) * 4) / 3);
             for (String dc : npStrategy.getDatacenters())
-                perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
+                perDc.put(dc, 
ConsistencyLevel.localQuorumFor(replicationStrategy, dc));
             return perDc;
         }
         else
         {
             ObjectIntHashMap<String> perDc = new ObjectIntHashMap<>(1);
-            perDc.put(DatabaseDescriptor.getLocalDataCenter(), 
quorumFor(keyspace));
+            perDc.put(DatabaseDescriptor.getLocalDataCenter(), 
quorumFor(replicationStrategy));
             return perDc;
         }
     }
 
-    public static ObjectIntHashMap<String> eachQuorumForWrite(Keyspace 
keyspace, Endpoints<?> pendingWithDown)
+    public static ObjectIntHashMap<String> 
eachQuorumForWrite(AbstractReplicationStrategy replicationStrategy, 
Endpoints<?> pendingWithDown)
     {
-        ObjectIntHashMap<String> perDc = eachQuorumForRead(keyspace);
+        ObjectIntHashMap<String> perDc = 
eachQuorumForRead(replicationStrategy);
         addToCountPerDc(perDc, pendingWithDown, 1);
         return perDc;
     }
 
-    public int blockFor(Keyspace keyspace)
+    public int blockFor(AbstractReplicationStrategy replicationStrategy)
     {
         switch (this)
         {
@@ -139,35 +138,35 @@ public enum ConsistencyLevel
                 return 3;
             case QUORUM:
             case SERIAL:
-                return quorumFor(keyspace);
+                return quorumFor(replicationStrategy);
             case ALL:
-                return 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+                return replicationStrategy.getReplicationFactor().allReplicas;
             case LOCAL_QUORUM:
             case LOCAL_SERIAL:
-                return localQuorumForOurDc(keyspace);
+                return localQuorumForOurDc(replicationStrategy);
             case EACH_QUORUM:
-                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+                if (replicationStrategy instanceof NetworkTopologyStrategy)
                 {
-                    NetworkTopologyStrategy strategy = 
(NetworkTopologyStrategy) keyspace.getReplicationStrategy();
+                    NetworkTopologyStrategy strategy = 
(NetworkTopologyStrategy) replicationStrategy;
                     int n = 0;
                     for (String dc : strategy.getDatacenters())
-                        n += localQuorumFor(keyspace, dc);
+                        n += localQuorumFor(replicationStrategy, dc);
                     return n;
                 }
                 else
                 {
-                    return quorumFor(keyspace);
+                    return quorumFor(replicationStrategy);
                 }
             default:
                 throw new UnsupportedOperationException("Invalid consistency 
level: " + toString());
         }
     }
 
-    public int blockForWrite(Keyspace keyspace, Endpoints<?> pending)
+    public int blockForWrite(AbstractReplicationStrategy replicationStrategy, 
Endpoints<?> pending)
     {
         assert pending != null;
 
-        int blockFor = blockFor(keyspace);
+        int blockFor = blockFor(replicationStrategy);
         switch (this)
         {
             case ANY:
@@ -189,9 +188,9 @@ public enum ConsistencyLevel
      * Determine if this consistency level meets or exceeds the consistency 
requirements of the given cl for the given keyspace
      * WARNING: this is not locality aware; you cannot safely use this with 
mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM)
      */
-    public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
+    public boolean satisfies(ConsistencyLevel other, 
AbstractReplicationStrategy replicationStrategy)
     {
-        return blockFor(keyspace) >= other.blockFor(keyspace);
+        return blockFor(replicationStrategy) >= 
other.blockFor(replicationStrategy);
     }
 
     public boolean isDatacenterLocal()
@@ -199,7 +198,7 @@ public enum ConsistencyLevel
         return isDCLocal;
     }
 
-    public void validateForRead(String keyspaceName) throws 
InvalidRequestException
+    public void validateForRead() throws InvalidRequestException
     {
         switch (this)
         {
@@ -208,7 +207,7 @@ public enum ConsistencyLevel
         }
     }
 
-    public void validateForWrite(String keyspaceName) throws 
InvalidRequestException
+    public void validateForWrite() throws InvalidRequestException
     {
         switch (this)
         {
@@ -219,12 +218,12 @@ public enum ConsistencyLevel
     }
 
     // This is the same than validateForWrite really, but we include a 
slightly different error message for SERIAL/LOCAL_SERIAL
-    public void validateForCasCommit(String keyspaceName) throws 
InvalidRequestException
+    public void validateForCasCommit(AbstractReplicationStrategy 
replicationStrategy) throws InvalidRequestException
     {
         switch (this)
         {
             case EACH_QUORUM:
-                requireNetworkTopologyStrategy(keyspaceName);
+                requireNetworkTopologyStrategy(replicationStrategy);
                 break;
             case SERIAL:
             case LOCAL_SERIAL:
@@ -252,10 +251,10 @@ public enum ConsistencyLevel
             throw new InvalidRequestException("Counter operations are 
inherently non-serializable");
     }
 
-    private void requireNetworkTopologyStrategy(String keyspaceName) throws 
InvalidRequestException
+    private void requireNetworkTopologyStrategy(AbstractReplicationStrategy 
replicationStrategy) throws InvalidRequestException
     {
-        AbstractReplicationStrategy strategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-        if (!(strategy instanceof NetworkTopologyStrategy))
-            throw new InvalidRequestException(String.format("consistency level 
%s not compatible with replication strategy (%s)", this, 
strategy.getClass().getName()));
+        if (!(replicationStrategy instanceof NetworkTopologyStrategy))
+            throw new InvalidRequestException(String.format("consistency level 
%s not compatible with replication strategy (%s)",
+                                                            this, 
replicationStrategy.getClass().getName()));
     }
 }
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index bc0cd85..fe1e46e 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
@@ -151,18 +152,19 @@ public class CounterMutation implements IMutation
     {
         long startTime = System.nanoTime();
 
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
         for (Lock lock : LOCKS.bulkGet(getCounterLockKeys()))
         {
             long timeout = getTimeout(NANOSECONDS) - (System.nanoTime() - 
startTime);
             try
             {
                 if (!lock.tryLock(timeout, NANOSECONDS))
-                    throw new WriteTimeoutException(WriteType.COUNTER, 
consistency(), 0, consistency().blockFor(keyspace));
+                    throw new WriteTimeoutException(WriteType.COUNTER, 
consistency(), 0, consistency().blockFor(replicationStrategy));
                 locks.add(lock);
             }
             catch (InterruptedException e)
             {
-                throw new WriteTimeoutException(WriteType.COUNTER, 
consistency(), 0, consistency().blockFor(keyspace));
+                throw new WriteTimeoutException(WriteType.COUNTER, 
consistency(), 0, consistency().blockFor(replicationStrategy));
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java 
b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index e824732..b5aa063 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -59,10 +59,8 @@ public final class ViewUtils
      *
      * @return Optional.empty() if this method is called using a base token 
which does not belong to this replica
      */
-    public static Optional<Replica> getViewNaturalEndpoint(String 
keyspaceName, Token baseToken, Token viewToken)
+    public static Optional<Replica> 
getViewNaturalEndpoint(AbstractReplicationStrategy replicationStrategy, Token 
baseToken, Token viewToken)
     {
-        AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-
         String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
         EndpointsForToken naturalBaseReplicas = 
replicationStrategy.getNaturalReplicasForToken(baseToken);
         EndpointsForToken naturalViewReplicas = 
replicationStrategy.getNaturalReplicasForToken(viewToken);
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java 
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index d44fdd7..ff81732 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -39,9 +39,12 @@ import java.util.function.Predicate;
 public abstract class ReplicaLayout<E extends Endpoints<E>>
 {
     private final E natural;
+    // the snapshot of the replication strategy that corresponds to the 
replica layout
+    private final AbstractReplicationStrategy replicationStrategy;
 
-    ReplicaLayout(E natural)
+    ReplicaLayout(AbstractReplicationStrategy replicationStrategy, E natural)
     {
+        this.replicationStrategy = replicationStrategy;
         this.natural = natural;
     }
 
@@ -55,6 +58,11 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
         return natural;
     }
 
+    public final AbstractReplicationStrategy replicationStrategy()
+    {
+        return replicationStrategy;
+    }
+
     /**
      * All relevant owners of the ring position(s) for this operation, as 
implied by the current ring layout.
      * For writes, this will include pending owners, and for reads it will be 
equivalent to natural()
@@ -71,9 +79,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
 
     public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> 
implements ForToken
     {
-        public ForTokenRead(EndpointsForToken natural)
+        public ForTokenRead(AbstractReplicationStrategy replicationStrategy, 
EndpointsForToken natural)
         {
-            super(natural);
+            super(replicationStrategy, natural);
         }
 
         @Override
@@ -87,7 +95,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
             EndpointsForToken filtered = natural().filter(filter);
             // AbstractReplicaCollection.filter returns itself if all elements 
match the filter
             if (filtered == natural()) return this;
-            return new ReplicaLayout.ForTokenRead(filtered);
+            return new ReplicaLayout.ForTokenRead(replicationStrategy(), 
filtered);
         }
     }
 
@@ -95,9 +103,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
     {
         final AbstractBounds<PartitionPosition> range;
 
-        public ForRangeRead(AbstractBounds<PartitionPosition> range, 
EndpointsForRange natural)
+        public ForRangeRead(AbstractReplicationStrategy replicationStrategy, 
AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
         {
-            super(natural);
+            super(replicationStrategy, natural);
             this.range = range;
         }
 
@@ -112,7 +120,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
             EndpointsForRange filtered = natural().filter(filter);
             // AbstractReplicaCollection.filter returns itself if all elements 
match the filter
             if (filtered == natural()) return this;
-            return new ReplicaLayout.ForRangeRead(range(), filtered);
+            return new ReplicaLayout.ForRangeRead(replicationStrategy(), 
range(), filtered);
         }
     }
 
@@ -121,9 +129,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
         final E all;
         final E pending;
 
-        ForWrite(E natural, E pending, E all)
+        ForWrite(AbstractReplicationStrategy replicationStrategy, E natural, E 
pending, E all)
         {
-            super(natural);
+            super(replicationStrategy, natural);
             assert pending != null && !haveWriteConflicts(natural, pending);
             if (all == null)
                 all = Endpoints.concat(natural, pending);
@@ -149,13 +157,13 @@ public abstract class ReplicaLayout<E extends 
Endpoints<E>>
 
     public static class ForTokenWrite extends ForWrite<EndpointsForToken> 
implements ForToken
     {
-        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken 
pending)
+        public ForTokenWrite(AbstractReplicationStrategy replicationStrategy, 
EndpointsForToken natural, EndpointsForToken pending)
         {
-            this(natural, pending, null);
+            this(replicationStrategy, natural, pending, null);
         }
-        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken 
pending, EndpointsForToken all)
+        public ForTokenWrite(AbstractReplicationStrategy replicationStrategy, 
EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all)
         {
-            super(natural, pending, all);
+            super(replicationStrategy, natural, pending, all);
         }
 
         @Override
@@ -168,6 +176,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
             if (filtered == all()) return this;
             // unique by endpoint, so can for efficiency filter only on 
endpoint
             return new ReplicaLayout.ForTokenWrite(
+                    replicationStrategy(),
                     natural().keep(filtered.endpoints()),
                     pending().keep(filtered.endpoints()),
                     filtered
@@ -196,19 +205,20 @@ public abstract class ReplicaLayout<E extends 
Endpoints<E>>
     {
         // TODO: these should be cached, not the natural replicas
         // TODO: race condition to fetch these. implications??
-        EndpointsForToken natural = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
+        EndpointsForToken natural = 
replicationStrategy.getNaturalReplicasForToken(token);
         EndpointsForToken pending = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, 
keyspace.getName());
-        return forTokenWrite(natural, pending);
+        return forTokenWrite(replicationStrategy, natural, pending);
     }
 
-    public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken 
natural, EndpointsForToken pending)
+    public static ReplicaLayout.ForTokenWrite 
forTokenWrite(AbstractReplicationStrategy replicationStrategy, 
EndpointsForToken natural, EndpointsForToken pending)
     {
         if (haveWriteConflicts(natural, pending))
         {
             natural = resolveWriteConflictsInNatural(natural, pending);
             pending = resolveWriteConflictsInPending(natural, pending);
         }
-        return new ReplicaLayout.ForTokenWrite(natural, pending);
+        return new ReplicaLayout.ForTokenWrite(replicationStrategy, natural, 
pending);
     }
 
     /**
@@ -315,12 +325,12 @@ public abstract class ReplicaLayout<E extends 
Endpoints<E>>
      * @return the read layout for a token - this includes only live natural 
replicas, i.e. those that are not pending
      * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
      */
-    static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace 
keyspace, Token token)
+    static ReplicaLayout.ForTokenRead 
forTokenReadLiveSorted(AbstractReplicationStrategy replicationStrategy, Token 
token)
     {
-        EndpointsForToken replicas = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+        EndpointsForToken replicas = 
replicationStrategy.getNaturalReplicasForToken(token);
         replicas = 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
         replicas = replicas.filter(FailureDetector.isReplicaAlive);
-        return new ReplicaLayout.ForTokenRead(replicas);
+        return new ReplicaLayout.ForTokenRead(replicationStrategy, replicas);
     }
 
     /**
@@ -328,12 +338,12 @@ public abstract class ReplicaLayout<E extends 
Endpoints<E>>
      * @return the read layout for a range - this includes only live natural 
replicas, i.e. those that are not pending
      * and not marked down by the failure detector. these are reverse sorted 
by the badness score of the configured snitch
      */
-    static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace 
keyspace, AbstractBounds<PartitionPosition> range)
+    static ReplicaLayout.ForRangeRead 
forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy, 
AbstractBounds<PartitionPosition> range)
     {
-        EndpointsForRange replicas = 
keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+        EndpointsForRange replicas = 
replicationStrategy.getNaturalReplicas(range.right);
         replicas = 
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
 replicas);
         replicas = replicas.filter(FailureDetector.isReplicaAlive);
-        return new ReplicaLayout.ForRangeRead(range, replicas);
+        return new ReplicaLayout.ForRangeRead(replicationStrategy, range, 
replicas);
     }
 
 }
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 407db5b..51cab13 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -30,6 +30,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 {
     protected final Keyspace keyspace;
     protected final ConsistencyLevel consistencyLevel;
+    // The snapshot of the replication strategy when instantiating.
+    // It could be different than the one fetched from Keyspace later, e.g. RS 
altered during the query.
+    // Use the snapshot to calculate {@code blockFor} in order to have a 
consistent view of RS for the query.
+    protected final AbstractReplicationStrategy replicationStrategy;
 
     // all nodes we will contact via any mechanism, including hints
     // i.e., for:
@@ -41,10 +45,11 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
     //      ==> live.all()  (if consistencyLevel.isDCLocal(), then 
.filter(consistencyLevel.isLocal))
     private final E contacts;
 
-    ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
contacts)
+    ReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts)
     {
         assert contacts != null;
         this.keyspace = keyspace;
+        this.replicationStrategy = replicationStrategy;
         this.consistencyLevel = consistencyLevel;
         this.contacts = contacts;
     }
@@ -56,6 +61,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
     // TODO: should this semantically return true if we contain the endpoint, 
not the exact replica?
     public boolean contacts(Replica replica) { return 
contacts.contains(replica); }
     public Keyspace keyspace() { return keyspace; }
+    public AbstractReplicationStrategy replicationStrategy() { return 
replicationStrategy; }
     public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
 
     public static abstract class ForRead<E extends Endpoints<E>> extends 
ReplicaPlan<E>
@@ -64,13 +70,13 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         // we will consult this collection to find uncontacted nodes we might 
contact if we doubt we will meet consistency level
         private final E candidates;
 
-        ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
candidates, E contact)
+        ForRead(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, E candidates, E 
contacts)
         {
-            super(keyspace, consistencyLevel, contact);
+            super(keyspace, replicationStrategy, consistencyLevel, contacts);
             this.candidates = candidates;
         }
 
-        public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+        public int blockFor() { return 
consistencyLevel.blockFor(replicationStrategy); }
 
         public E candidates() { return candidates; }
 
@@ -92,14 +98,18 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
     public static class ForTokenRead extends ForRead<EndpointsForToken>
     {
-        public ForTokenRead(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+        public ForTokenRead(Keyspace keyspace,
+                            AbstractReplicationStrategy replicationStrategy,
+                            ConsistencyLevel consistencyLevel,
+                            EndpointsForToken candidates,
+                            EndpointsForToken contacts)
         {
-            super(keyspace, consistencyLevel, candidates, contact);
+            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contacts);
         }
 
         ForTokenRead withContact(EndpointsForToken newContact)
         {
-            return new ForTokenRead(keyspace, consistencyLevel, candidates(), 
newContact);
+            return new ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates(), newContact);
         }
     }
 
@@ -109,13 +119,14 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         final int vnodeCount;
 
         public ForRangeRead(Keyspace keyspace,
+                            AbstractReplicationStrategy replicationStrategy,
                             ConsistencyLevel consistencyLevel,
                             AbstractBounds<PartitionPosition> range,
                             EndpointsForRange candidates,
                             EndpointsForRange contact,
                             int vnodeCount)
         {
-            super(keyspace, consistencyLevel, candidates, contact);
+            super(keyspace, replicationStrategy, consistencyLevel, candidates, 
contact);
             this.range = range;
             this.vnodeCount = vnodeCount;
         }
@@ -129,7 +140,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
         ForRangeRead withContact(EndpointsForRange newContact)
         {
-            return new ForRangeRead(keyspace, consistencyLevel, range, 
candidates(), newContact, vnodeCount);
+            return new ForRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, candidates(), newContact, vnodeCount);
         }
     }
 
@@ -140,15 +151,15 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         final E liveAndDown;
         final E live;
 
-        ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E 
pending, E liveAndDown, E live, E contact)
+        ForWrite(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, E pending, E 
liveAndDown, E live, E contact)
         {
-            super(keyspace, consistencyLevel, contact);
+            super(keyspace, replicationStrategy, consistencyLevel, contact);
             this.pending = pending;
             this.liveAndDown = liveAndDown;
             this.live = live;
         }
 
-        public int blockFor() { return 
consistencyLevel.blockForWrite(keyspace, pending()); }
+        public int blockFor() { return 
consistencyLevel.blockForWrite(replicationStrategy, pending()); }
 
         /** Replicas that a region of the ring is moving to; not yet ready to 
serve reads, but should receive writes */
         public E pending() { return pending; }
@@ -173,14 +184,14 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
     public static class ForTokenWrite extends ForWrite<EndpointsForToken>
     {
-        public ForTokenWrite(Keyspace keyspace, ConsistencyLevel 
consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, 
EndpointsForToken live, EndpointsForToken contact)
+        public ForTokenWrite(Keyspace keyspace, AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, EndpointsForToken 
pending, EndpointsForToken liveAndDown, EndpointsForToken live, 
EndpointsForToken contact)
         {
-            super(keyspace, consistencyLevel, pending, liveAndDown, live, 
contact);
+            super(keyspace, replicationStrategy, consistencyLevel, pending, 
liveAndDown, live, contact);
         }
 
         private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel 
newConsistencyLevel, EndpointsForToken newContact)
         {
-            return new ReplicaPlan.ForTokenWrite(keyspace, 
newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+            return new ReplicaPlan.ForTokenWrite(keyspace, 
replicationStrategy, newConsistencyLevel, pending(), liveAndDown(), live(), 
newContact);
         }
 
         ForTokenWrite withConsistencyLevel(ConsistencyLevel 
newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
@@ -193,7 +204,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
         ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, 
EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken 
live, EndpointsForToken contact, int requiredParticipants)
         {
-            super(keyspace, consistencyLevel, pending, liveAndDown, live, 
contact);
+            super(keyspace, keyspace.getReplicationStrategy(), 
consistencyLevel, pending, liveAndDown, live, contact);
             this.requiredParticipants = requiredParticipants;
         }
 
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java 
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 083da7a..67b89e5 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -72,7 +72,7 @@ public class ReplicaPlans
 {
     private static final Logger logger = 
LoggerFactory.getLogger(ReplicaPlans.class);
 
-    public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
+    public static boolean 
isSufficientLiveReplicasForRead(AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> 
liveReplicas)
     {
         switch (consistencyLevel)
         {
@@ -82,16 +82,16 @@ public class ReplicaPlans
             case LOCAL_ONE:
                 return countInOurDc(liveReplicas).hasAtleast(1, 1);
             case LOCAL_QUORUM:
-                return 
countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1);
+                return 
countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(replicationStrategy), 
1);
             case EACH_QUORUM:
-                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+                if (replicationStrategy instanceof NetworkTopologyStrategy)
                 {
                     int fullCount = 0;
-                    Collection<String> dcs = ((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getDatacenters();
+                    Collection<String> dcs = ((NetworkTopologyStrategy) 
replicationStrategy).getDatacenters();
                     for (ObjectObjectCursor<String, Replicas.ReplicaCount> 
entry : countPerDc(dcs, liveReplicas))
                     {
                         Replicas.ReplicaCount count = entry.value;
-                        if (!count.hasAtleast(localQuorumFor(keyspace, 
entry.key), 0))
+                        if 
(!count.hasAtleast(localQuorumFor(replicationStrategy, entry.key), 0))
                             return false;
                         fullCount += count.fullReplicas();
                     }
@@ -99,20 +99,20 @@ public class ReplicaPlans
                 }
                 // Fallthough on purpose for SimpleStrategy
             default:
-                return liveReplicas.size() >= 
consistencyLevel.blockFor(keyspace)
+                return liveReplicas.size() >= 
consistencyLevel.blockFor(replicationStrategy)
                         && Replicas.countFull(liveReplicas) > 0;
         }
     }
 
-    static void assureSufficientLiveReplicasForRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws 
UnavailableException
+    static void 
assureSufficientLiveReplicasForRead(AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> 
liveReplicas) throws UnavailableException
     {
-        assureSufficientLiveReplicas(keyspace, consistencyLevel, liveReplicas, 
consistencyLevel.blockFor(keyspace), 1);
+        assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, 
liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1);
     }
-    static void assureSufficientLiveReplicasForWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> 
pendingWithDown) throws UnavailableException
+    static void 
assureSufficientLiveReplicasForWrite(AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, 
Endpoints<?> pendingWithDown) throws UnavailableException
     {
-        assureSufficientLiveReplicas(keyspace, consistencyLevel, allLive, 
consistencyLevel.blockForWrite(keyspace, pendingWithDown), 0);
+        assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, 
allLive, consistencyLevel.blockForWrite(replicationStrategy, pendingWithDown), 
0);
     }
-    static void assureSufficientLiveReplicas(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int 
blockForFullReplicas) throws UnavailableException
+    static void assureSufficientLiveReplicas(AbstractReplicationStrategy 
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, 
int blockFor, int blockForFullReplicas) throws UnavailableException
     {
         switch (consistencyLevel)
         {
@@ -141,14 +141,14 @@ public class ReplicaPlans
                 break;
             }
             case EACH_QUORUM:
-                if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+                if (replicationStrategy instanceof NetworkTopologyStrategy)
                 {
                     int total = 0;
                     int totalFull = 0;
-                    Collection<String> dcs = ((NetworkTopologyStrategy) 
keyspace.getReplicationStrategy()).getDatacenters();
+                    Collection<String> dcs = ((NetworkTopologyStrategy) 
replicationStrategy).getDatacenters();
                     for (ObjectObjectCursor<String, Replicas.ReplicaCount> 
entry : countPerDc(dcs, allLive))
                     {
-                        int dcBlockFor = localQuorumFor(keyspace, entry.key);
+                        int dcBlockFor = localQuorumFor(replicationStrategy, 
entry.key);
                         Replicas.ReplicaCount dcCount = entry.value;
                         if (!dcCount.hasAtleast(dcBlockFor, 0))
                             throw 
UnavailableException.create(consistencyLevel, entry.key, dcBlockFor, 
dcCount.allReplicas(), 0, dcCount.fullReplicas());
@@ -180,7 +180,7 @@ public class ReplicaPlans
     {
         EndpointsForToken one = EndpointsForToken.of(token, replica);
         EndpointsForToken empty = EndpointsForToken.empty(token);
-        return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, 
empty, one, one, one);
+        return new ReplicaPlan.ForTokenWrite(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, empty, one, one, one);
     }
 
     /**
@@ -199,10 +199,10 @@ public class ReplicaPlans
         Replica localSystemReplica = 
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
 
         ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                systemKeypsace.getReplicationStrategy(),
                 EndpointsForToken.of(token, localSystemReplica),
                 EndpointsForToken.empty(token)
         );
-
         return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, 
liveAndDown, writeAll);
     }
 
@@ -231,16 +231,14 @@ public class ReplicaPlans
         if (chosenEndpoints.isEmpty() && isAny)
             chosenEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
 
+        Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
         ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                systemKeypsace.getReplicationStrategy(),
                 
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
                 EndpointsForToken.empty(token)
         );
-
         // Batchlog is hosted by either one node or two nodes from different 
racks.
         ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? 
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
-        Keyspace systemKeypsace = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-
         // assume that we have already been given live endpoints, and skip 
applying the failure detector
         return forWrite(systemKeypsace, consistencyLevel, liveAndDown, 
liveAndDown, writeAll);
     }
@@ -333,7 +331,7 @@ public class ReplicaPlans
     @VisibleForTesting
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken 
pending, Predicate<Replica> isAlive, Selector selector) throws 
UnavailableException
     {
-        return forWrite(keyspace, consistencyLevel, 
ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+        return forWrite(keyspace, consistencyLevel, 
ReplicaLayout.forTokenWrite(keyspace.getReplicationStrategy(), natural, 
pending), isAlive, selector);
     }
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
Selector selector) throws UnavailableException
@@ -341,8 +339,7 @@ public class ReplicaPlans
         return forWrite(keyspace, consistencyLevel, liveAndDown, 
FailureDetector.isReplicaAlive, selector);
     }
 
-    @VisibleForTesting
-    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    private static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
Predicate<Replica> isAlive, Selector selector) throws UnavailableException
     {
         ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
         return forWrite(keyspace, consistencyLevel, liveAndDown, live, 
selector);
@@ -350,15 +347,21 @@ public class ReplicaPlans
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, 
ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
     {
-        EndpointsForToken contacts = selector.select(keyspace, 
consistencyLevel, liveAndDown, live);
-        assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, 
live.all(), liveAndDown.pending());
-        return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, 
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+        assert liveAndDown.replicationStrategy() == live.replicationStrategy()
+               : "ReplicaLayout liveAndDown and live should be derived from 
the same replication strategy.";
+        AbstractReplicationStrategy replicationStrategy = 
liveAndDown.replicationStrategy();
+        EndpointsForToken contacts = selector.select(consistencyLevel, 
liveAndDown, live);
+        assureSufficientLiveReplicasForWrite(replicationStrategy, 
consistencyLevel, live.all(), liveAndDown.pending());
+        return new ReplicaPlan.ForTokenWrite(keyspace, replicationStrategy, 
consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), 
contacts);
     }
 
     public interface Selector
     {
+        /**
+         * Select the {@code Endpoints} from {@param liveAndDown} and {@param 
live} to contact according to the consistency level.
+         */
         <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live);
+        E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live);
     }
 
     /**
@@ -371,7 +374,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             return liveAndDown.all();
         }
@@ -390,7 +393,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+        E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
@@ -406,7 +409,7 @@ public class ReplicaPlans
              * soft-ensure that we reach QUORUM in all DCs we are able to, by 
writing to every node;
              * even if we don't wait for ACK, we have in both cases sent 
sufficient messages.
               */
-            ObjectIntHashMap<String> requiredPerDc = 
eachQuorumForWrite(keyspace, liveAndDown.pending());
+            ObjectIntHashMap<String> requiredPerDc = 
eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
             addToCountPerDc(requiredPerDc, 
live.natural().filter(Replica::isFull), -1);
             addToCountPerDc(requiredPerDc, live.pending(), -1);
 
@@ -438,7 +441,7 @@ public class ReplicaPlans
         {
             @Override
             public <E extends Endpoints<E>, L extends 
ReplicaLayout.ForWrite<E>>
-            E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L 
liveAndDown, L live)
+            E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
             {
                 assert !any(liveAndDown.all(), Replica::isTransient);
 
@@ -449,7 +452,7 @@ public class ReplicaPlans
                 // finally, add sufficient nodes to achieve our consistency 
level
                 if (consistencyLevel != EACH_QUORUM)
                 {
-                    int add = consistencyLevel.blockForWrite(keyspace, 
liveAndDown.pending()) - contacts.size();
+                    int add = 
consistencyLevel.blockForWrite(liveAndDown.replicationStrategy(), 
liveAndDown.pending()) - contacts.size();
                     if (add > 0)
                     {
                         for (Replica replica : filter(live.all(), r -> 
!contacts.contains(r)))
@@ -462,7 +465,7 @@ public class ReplicaPlans
                 }
                 else
                 {
-                    ObjectIntHashMap<String> requiredPerDc = 
eachQuorumForWrite(keyspace, liveAndDown.pending());
+                    ObjectIntHashMap<String> requiredPerDc = 
eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
                     addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
                     IEndpointSnitch snitch = 
DatabaseDescriptor.getEndpointSnitch();
                     for (Replica replica : filter(live.all(), r -> 
!contacts.contains(r)))
@@ -486,6 +489,7 @@ public class ReplicaPlans
     public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, 
DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws 
UnavailableException
     {
         Token tk = key.getToken();
+
         ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
 
         Replicas.temporaryAssertFull(liveAndDown.all()); // TODO 
CASSANDRA-14547
@@ -527,10 +531,9 @@ public class ReplicaPlans
                 : liveNaturalReplicas;
     }
 
-    private static <E extends Endpoints<E>> E 
contactForEachQuorumRead(Keyspace keyspace, E candidates)
+    private static <E extends Endpoints<E>> E 
contactForEachQuorumRead(NetworkTopologyStrategy replicationStrategy, E 
candidates)
     {
-        assert keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy;
-        ObjectIntHashMap<String> perDc = eachQuorumForRead(keyspace);
+        ObjectIntHashMap<String> perDc = 
eachQuorumForRead(replicationStrategy);
 
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         return candidates.filter(replica -> {
@@ -539,7 +542,7 @@ public class ReplicaPlans
         });
     }
 
-    private static <E extends Endpoints<E>> E contactForRead(Keyspace 
keyspace, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E 
candidates)
+    private static <E extends Endpoints<E>> E 
contactForRead(AbstractReplicationStrategy replicationStrategy, 
ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
     {
         /*
          * If we are doing an each quorum query, we have to make sure that the 
endpoints we select
@@ -550,10 +553,10 @@ public class ReplicaPlans
          *
          * TODO: this is still very inconistently managed between 
{LOCAL,EACH}_QUORUM and other consistency levels - should address this in a 
follow-up
          */
-        if (consistencyLevel == EACH_QUORUM && 
keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
-            return contactForEachQuorumRead(keyspace, candidates);
+        if (consistencyLevel == EACH_QUORUM && replicationStrategy instanceof 
NetworkTopologyStrategy)
+            return contactForEachQuorumRead((NetworkTopologyStrategy) 
replicationStrategy, candidates);
 
-        int count = consistencyLevel.blockFor(keyspace) + (alwaysSpeculate ? 1 
: 0);
+        int count = consistencyLevel.blockFor(replicationStrategy) + 
(alwaysSpeculate ? 1 : 0);
         return candidates.subList(0, Math.min(count, candidates.size()));
     }
 
@@ -564,7 +567,7 @@ public class ReplicaPlans
     public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace 
keyspace, Token token, Replica replica)
     {
         EndpointsForToken one = EndpointsForToken.of(token, replica);
-        return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, 
one, one);
+        return new ReplicaPlan.ForTokenRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one);
     }
 
     /**
@@ -574,7 +577,7 @@ public class ReplicaPlans
     {
         // TODO: this is unsafe, as one.range() may be inconsistent with our 
supplied range; should refactor Range/AbstractBounds to single class
         EndpointsForRange one = EndpointsForRange.of(replica);
-        return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, 
range, one, one, vnodeCount);
+        return new ReplicaPlan.ForRangeRead(keyspace, 
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, 
vnodeCount);
     }
 
     /**
@@ -587,11 +590,12 @@ public class ReplicaPlans
      */
     public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token 
token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
     {
-        EndpointsForToken candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forTokenReadLiveSorted(keyspace, token).natural());
-        EndpointsForToken contacts = contactForRead(keyspace, 
consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), 
candidates);
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
+        EndpointsForToken candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forTokenReadLiveSorted(replicationStrategy, token).natural());
+        EndpointsForToken contacts = contactForRead(replicationStrategy, 
consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), 
candidates);
 
-        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
contacts);
-        return new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, 
candidates, contacts);
+        assureSufficientLiveReplicasForRead(replicationStrategy, 
consistencyLevel, contacts);
+        return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, 
consistencyLevel, candidates, contacts);
     }
 
     /**
@@ -603,11 +607,12 @@ public class ReplicaPlans
      */
     public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int 
vnodeCount)
     {
-        EndpointsForRange candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forRangeReadLiveSorted(keyspace, range).natural());
-        EndpointsForRange contacts = contactForRead(keyspace, 
consistencyLevel, false, candidates);
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
+        EndpointsForRange candidates = candidatesForRead(consistencyLevel, 
ReplicaLayout.forRangeReadLiveSorted(replicationStrategy, range).natural());
+        EndpointsForRange contacts = contactForRead(replicationStrategy, 
consistencyLevel, false, candidates);
 
-        assureSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
contacts);
-        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, 
candidates, contacts, vnodeCount);
+        assureSufficientLiveReplicasForRead(replicationStrategy, 
consistencyLevel, contacts);
+        return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, 
consistencyLevel, range, candidates, contacts, vnodeCount);
     }
 
     /**
@@ -618,18 +623,19 @@ public class ReplicaPlans
         // TODO: should we be asserting that the ranges are adjacent?
         AbstractBounds<PartitionPosition> newRange = 
left.range().withNewRight(right.range().right);
         EndpointsForRange mergedCandidates = 
left.candidates().keep(right.candidates().endpoints());
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
 
         // Check if there are enough shared endpoints for the merge to be 
possible.
-        if (!isSufficientLiveReplicasForRead(keyspace, consistencyLevel, 
mergedCandidates))
+        if (!isSufficientLiveReplicasForRead(replicationStrategy, 
consistencyLevel, mergedCandidates))
             return null;
 
-        EndpointsForRange contacts = contactForRead(keyspace, 
consistencyLevel, false, mergedCandidates);
+        EndpointsForRange contacts = contactForRead(replicationStrategy, 
consistencyLevel, false, mergedCandidates);
 
         // Estimate whether merging will be a win or not
         if 
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, 
left.contacts(), right.contacts()))
             return null;
 
         // If we get there, merge this range and the next one
-        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, 
newRange, mergedCandidates, contacts, left.vnodeCount() + right.vnodeCount());
+        return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, 
consistencyLevel, newRange, mergedCandidates, contacts, left.vnodeCount() + 
right.vnodeCount());
     }
 }
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 589a259..f2bbb9f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -1294,7 +1295,7 @@ public class TokenMetadata
     public EndpointsForToken getWriteEndpoints(Token token, String 
keyspaceName, EndpointsForToken natural)
     {
         EndpointsForToken pending = pendingEndpointsForToken(token, 
keyspaceName);
-        return ReplicaLayout.forTokenWrite(natural, pending).all();
+        return 
ReplicaLayout.forTokenWrite(Keyspace.open(keyspaceName).getReplicationStrategy(),
 natural, pending).all();
     }
 
     /** @return an endpoint to token multimap representation of 
tokenToEndpointMap (a copy) */
diff --git 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 389dcd5..65cf3cc 100644
--- 
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ 
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -49,9 +49,9 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
         super(replicaPlan, callback, writeType, queryStartNanoTime);
         assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
 
-        if (replicaPlan.keyspace().getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
+        if (replicaPlan.replicationStrategy() instanceof 
NetworkTopologyStrategy)
         {
-            NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
replicaPlan.keyspace().getReplicationStrategy();
+            NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) 
replicaPlan.replicationStrategy();
             for (String dc : strategy.getDatacenters())
             {
                 int rf = strategy.getReplicationFactor(dc).allReplicas;
@@ -60,7 +60,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends 
AbstractWriteResponse
         }
         else
         {
-            responses.put(DatabaseDescriptor.getLocalDataCenter(), new 
AtomicInteger(ConsistencyLevel.quorumFor(replicaPlan.keyspace())));
+            responses.put(DatabaseDescriptor.getLocalDataCenter(), new 
AtomicInteger(ConsistencyLevel.quorumFor(replicaPlan.replicationStrategy())));
         }
 
         // During bootstrap, we have to include the pending endpoints or we 
may fail the consistency level
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index b7b9f2c..72801a9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -431,17 +431,19 @@ public class StorageProxy implements StorageProxyMBean
     {
         int contentions = 0;
         Keyspace keyspace = Keyspace.open(metadata.keyspace);
+        AbstractReplicationStrategy latestRs = 
keyspace.getReplicationStrategy();
         try
         {
             consistencyForPaxos.validateForCas();
-            
consistencyForReplayCommits.validateForCasCommit(metadata.keyspace);
-            consistencyForCommit.validateForCasCommit(metadata.keyspace);
+            consistencyForReplayCommits.validateForCasCommit(latestRs);
+            consistencyForCommit.validateForCasCommit(latestRs);
 
             long timeoutNanos = 
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
             while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
             {
                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
                 ReplicaPlan.ForPaxosWrite replicaPlan = 
ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos);
+                latestRs = replicaPlan.replicationStrategy();
                 PaxosBallotAndContention pair = 
beginAndRepairPaxos(queryStartNanoTime,
                                                                     key,
                                                                     metadata,
@@ -501,7 +503,7 @@ public class StorageProxy implements StorageProxyMBean
             recordCasContention(metadata, key, casMetrics, contentions);
         }
 
-        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(keyspace), contentions);
+        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(latestRs), contentions);
     }
 
     /**
@@ -617,7 +619,7 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
-        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)), contentions);
+        throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(paxosPlan.replicationStrategy()), contentions);
     }
 
     /**
@@ -713,7 +715,7 @@ public class StorageProxy implements StorageProxyMBean
         ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
         if (shouldBlock)
         {
-            AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+            AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
             responseHandler = rs.getWriteResponseHandler(replicaPlan, null, 
WriteType.SIMPLE, queryStartNanoTime);
         }
 
@@ -967,7 +969,8 @@ public class StorageProxy implements StorageProxyMBean
                 {
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
-                    Optional<Replica> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+                    AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
+                    Optional<Replica> pairedEndpoint = 
ViewUtils.getViewNaturalEndpoint(replicationStrategy, baseToken, tk);
                     EndpointsForToken pendingReplicas = 
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, 
keyspaceName);
 
                     // if there are no paired endpoints there are probably 
range movements going on, so we write to the local batchlog to replay later
@@ -1002,11 +1005,13 @@ public class StorageProxy implements StorageProxyMBean
                     }
                     else
                     {
+                        ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(replicationStrategy,
+                                                                               
               EndpointsForToken.of(tk, pairedEndpoint.get()),
+                                                                               
               pendingReplicas);
                         wrappers.add(wrapViewBatchResponseHandler(mutation,
                                                                   
consistencyLevel,
                                                                   
consistencyLevel,
-                                                                  
EndpointsForToken.of(tk, pairedEndpoint.get()),
-                                                                  
pendingReplicas,
+                                                                  liveAndDown,
                                                                   baseComplete,
                                                                   
WriteType.BATCH,
                                                                   cleanup,
@@ -1269,11 +1274,10 @@ public class StorageProxy implements StorageProxyMBean
     {
         String keyspaceName = mutation.getKeyspaceName();
         Keyspace keyspace = Keyspace.open(keyspaceName);
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-
         Token tk = mutation.key().getToken();
 
         ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
         AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(replicaPlan, callback, writeType, 
queryStartNanoTime);
 
         performer.apply(mutation, replicaPlan, responseHandler, 
localDataCenter);
@@ -1289,12 +1293,12 @@ public class StorageProxy implements StorageProxyMBean
                                                                         long 
queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         Token tk = mutation.key().getToken();
 
         ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
         AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
-        BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, 
batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(rs), 
cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1305,24 +1309,20 @@ public class StorageProxy implements StorageProxyMBean
     private static WriteResponseHandlerWrapper 
wrapViewBatchResponseHandler(Mutation mutation,
                                                                             
ConsistencyLevel consistencyLevel,
                                                                             
ConsistencyLevel batchConsistencyLevel,
-                                                                            
EndpointsForToken naturalEndpoints,
-                                                                            
EndpointsForToken pendingEndpoints,
+                                                                            
ReplicaLayout.ForTokenWrite liveAndDown,
                                                                             
AtomicLong baseComplete,
                                                                             
WriteType writeType,
                                                                             
BatchlogResponseHandler.BatchlogCleanup cleanup,
                                                                             
long queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-
-        ReplicaLayout.ForTokenWrite liveAndDown = 
ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
         ReplicaPlan.ForTokenWrite replicaPlan = 
ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, 
ReplicaPlans.writeAll);
-
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(replicaPlan, () -> {
+        AbstractReplicationStrategy replicationStrategy = 
replicaPlan.replicationStrategy();
+        AbstractWriteResponseHandler<IMutation> writeHandler = 
replicationStrategy.getWriteResponseHandler(replicaPlan, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - 
baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, MILLISECONDS);
         }, writeType, queryStartNanoTime);
-        BatchlogResponseHandler<IMutation> batchHandler = new 
ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), 
cleanup, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new 
ViewWriteMetricsWrapped(writeHandler, 
batchConsistencyLevel.blockFor(replicationStrategy), cleanup, 
queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1617,14 +1617,15 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        EndpointsForToken replicas = 
keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
+        AbstractReplicationStrategy replicationStrategy = 
keyspace.getReplicationStrategy();
+        EndpointsForToken replicas = 
replicationStrategy.getNaturalReplicasForToken(key);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients 
yet, maybe because still bootstrapping
         replicas = replicas.filter(replica -> 
StorageService.instance.isRpcReady(replica.endpoint()));
 
         // TODO have a way to compute the consistency level
         if (replicas.isEmpty())
-            throw UnavailableException.create(cl, cl.blockFor(keyspace), 0);
+            throw UnavailableException.create(cl, 
cl.blockFor(replicationStrategy), 0);
 
         List<Replica> localReplicas = new ArrayList<>(replicas.size());
 
@@ -1636,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             // If the consistency required is local then we should not involve 
other DCs
             if (cl.isDatacenterLocal())
-                throw UnavailableException.create(cl, cl.blockFor(keyspace), 
0);
+                throw UnavailableException.create(cl, 
cl.blockFor(replicationStrategy), 0);
 
             // No endpoint in local DC, pick the closest endpoint according to 
the snitch
             replicas = 
snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
@@ -1739,6 +1740,8 @@ public class StorageProxy implements StorageProxyMBean
         SinglePartitionReadCommand command = group.queries.get(0);
         TableMetadata metadata = command.metadata();
         DecoratedKey key = command.partitionKey();
+        // calculate the blockFor before repair any paxos round to avoid RS 
being altered in between.
+        int blockForRead = 
consistencyLevel.blockFor(Keyspace.open(metadata.keyspace).getReplicationStrategy());
 
         PartitionIterator result = null;
         try
@@ -1771,7 +1774,7 @@ public class StorageProxy implements StorageProxyMBean
             }
             catch (WriteTimeoutException e)
             {
-                throw new ReadTimeoutException(consistencyLevel, 0, 
consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), false);
+                throw new ReadTimeoutException(consistencyLevel, 0, 
blockForRead, false);
             }
             catch (WriteFailureException e)
             {
diff --git 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index e17fe09..889fa79 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -537,14 +537,14 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
             }
             catch (ReadTimeoutException e)
             {
-                int blockFor = consistency.blockFor(keyspace);
+                int blockFor = 
consistency.blockFor(replicaPlan.replicationStrategy());
                 throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
             }
             catch (UnavailableException e)
             {
-                int blockFor = consistency.blockFor(keyspace);
+                int blockFor = 
consistency.blockFor(replicaPlan.replicationStrategy());
                 throw UnavailableException.create(consistency, blockFor, 
blockFor - 1);
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 1b08877..ca47612 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -155,7 +155,7 @@ public abstract class AbstractReadRepair<E extends 
Endpoints<E>, P extends Repli
         ConsistencyLevel consistency = replicaPlan().consistencyLevel();
         ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? 
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
         return  consistency != ConsistencyLevel.EACH_QUORUM
-                && consistency.satisfies(speculativeCL, cfs.keyspace)
+                && consistency.satisfies(speculativeCL, 
replicaPlan.get().replicationStrategy())
                 && cfs.sampleReadLatencyNanos <= 
command.getTimeout(NANOSECONDS);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index 68d1b4c..7a4882b 100644
--- 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++ 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -85,7 +85,7 @@ public class BlockingReadRepairs
 
                 if (!suppressException)
                 {
-                    int blockFor = consistency.blockFor(keyspace);
+                    int blockFor = 
consistency.blockFor(keyspace.getReplicationStrategy());
                     Tracing.trace("Timed out while read-repairing after 
receiving all {} data and digest responses", blockFor);
                     throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
                 }
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java 
b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 7eebef7..7855150 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -77,7 +77,7 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
                                                                              
new StringToken("CA"),
                                                                              
new StringToken("BB"));
 
@@ -110,7 +110,7 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
                                                                              
new StringToken("CA"),
                                                                              
new StringToken("BB"));
 
@@ -142,7 +142,7 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", 
KeyspaceParams.create(false, replicationMap));
         Schema.instance.load(meta);
 
-        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<Replica> naturalEndpoint = 
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
                                                                              
new StringToken("AB"),
                                                                              
new StringToken("BB"));
 
diff --git 
a/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java 
b/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java
new file mode 100644
index 0000000..d5f62d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * The test cases check that no false unavailable exception is thrown due to
+ * the concurrent modification to the ReplicationStrategy, e.g. alter keyspace.
+ *
+ * See https://issues.apache.org/jira/browse/CASSANDRA-16545 for details.
+ */
+@RunWith(BMUnitRunner.class)
+@BMRule(name = "FailureDecector sees all nodes as live", // applies to all 
test cases in the class
+        targetClass = "FailureDetector",
+        targetMethod = "isAlive",
+        action = "return true;")
+public class AssureSufficientLiveNodesTest
+{
+    private static final AtomicInteger testIdGen = new AtomicInteger(0);
+    private static final Supplier<String> keyspaceNameGen = () -> "race_" + 
testIdGen.getAndIncrement();
+    private static final String DC1 = "datacenter1";
+    private static final String DC2 = "datacenter2";
+    private static final String DC3 = "datacenter3";
+    private static final int RACE_TEST_LOOPS = 100;
+    private static final Token tk = new Murmur3Partitioner.LongToken(0);
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        // Register peers with expected DC for NetworkTopologyStrategy.
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        DatabaseDescriptor.setEndpointSnitch(new 
AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                byte[] address = endpoint.addressBytes;
+                return "rake" + address[1];
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                byte[] address = endpoint.addressBytes;
+                return "datacenter" + address[1];
+            }
+        });
+
+        List<InetAddressAndPort> instances = ImmutableList.of(
+            // datacenter 1
+            InetAddressAndPort.getByName("127.1.0.255"), 
InetAddressAndPort.getByName("127.1.0.254"), 
InetAddressAndPort.getByName("127.1.0.253"),
+            // datacenter 2
+            InetAddressAndPort.getByName("127.2.0.255"), 
InetAddressAndPort.getByName("127.2.0.254"), 
InetAddressAndPort.getByName("127.2.0.253"),
+            // datacenter 3
+            InetAddressAndPort.getByName("127.3.0.255"), 
InetAddressAndPort.getByName("127.3.0.254"), 
InetAddressAndPort.getByName("127.3.0.253"));
+
+        for (int i = 0; i < instances.size(); i++)
+        {
+            InetAddressAndPort ip = instances.get(i);
+            metadata.updateHostId(UUID.randomUUID(), ip);
+            metadata.updateNormalToken(new Murmur3Partitioner.LongToken(i), 
ip);
+        }
+    }
+
+    @Test
+    public void insufficientLiveNodesTest()
+    {
+        final KeyspaceParams largeRF = KeyspaceParams.nts("datacenter1", 6);
+        // Not a race in fact. It is just testing the Unavailable can be 
correctly thrown.
+        assertThatThrownBy(() ->
+           raceOfReplicationStrategyTest(largeRF, largeRF, 1,
+                                         keyspace -> 
ReplicaPlans.forWrite(keyspace, QUORUM, tk, ReplicaPlans.writeNormal))
+        ).as("Unavailable should be thrown given 3 live nodes is less than a 
quorum of 6")
+         .isInstanceOf(UnavailableException.class)
+         .hasMessageContaining("Cannot achieve consistency level QUORUM");
+    }
+
+    @Test
+    public void addDatacenterShouldNotCausesUnavailableWithEachQuorumTest() 
throws Throwable
+    {
+        // write
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 3),
+            // alter to
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // test
+            keyspace -> ReplicaPlans.forWrite(keyspace, EACH_QUORUM, tk, 
ReplicaPlans.writeNormal)
+        );
+        // read
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 3),
+            // alter to
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // test
+            keyspace -> ReplicaPlans.forRead(keyspace, tk, EACH_QUORUM, 
NeverSpeculativeRetryPolicy.INSTANCE)
+        );
+    }
+
+
+    @Test
+    public void addDatacenterShouldNotCausesUnavailableWithQuorumTest() throws 
Throwable
+    {
+        // write
+        raceOfReplicationStrategyTest(
+            // init. The # of live endpoints is 3.
+            KeyspaceParams.nts(DC1, 3),
+            // alter to. (3 + 3) / 2 + 1 > 3
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // test
+            keyspace -> ReplicaPlans.forWrite(keyspace, QUORUM, tk, 
ReplicaPlans.writeNormal)
+        );
+        raceOfReplicationStrategyTest(
+            // init. The # of live endpoints is 3 = 2 + 1
+            KeyspaceParams.nts(DC1, 2, DC2, 1),
+            // alter to. (3 + 3) / 2 + 1 > 3
+            KeyspaceParams.nts(DC1, 2, DC2, 1, DC3, 3),
+            // test
+            keyspace -> ReplicaPlans.forWrite(keyspace, QUORUM, tk, 
ReplicaPlans.writeNormal)
+        );
+
+        // read
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 3),
+            // alter to
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // test
+            keyspace -> ReplicaPlans.forRead(keyspace, tk, QUORUM, 
NeverSpeculativeRetryPolicy.INSTANCE)
+        );
+        raceOfReplicationStrategyTest(
+            // init. The # of live endpoints is 3 = 2 + 1
+            KeyspaceParams.nts(DC1, 2, DC2, 1),
+            // alter to. (3 + 3) / 2 + 1 > 3
+            KeyspaceParams.nts(DC1, 2, DC2, 1, DC3, 3),
+            // test
+            keyspace -> ReplicaPlans.forRead(keyspace, tk, QUORUM, 
NeverSpeculativeRetryPolicy.INSTANCE)
+        );
+    }
+
+    @Test
+    public void raceOnRemoveDatacenterNotCausesUnavailable() throws Throwable
+    {
+        // write
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // alter to
+            KeyspaceParams.nts(DC1, 3),
+            // test
+            keyspace -> ReplicaPlans.forWrite(keyspace, EACH_QUORUM, tk, 
ReplicaPlans.writeNormal)
+        );
+
+        // read
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 3, DC2, 3),
+            // alter to
+            KeyspaceParams.nts(DC1, 3),
+            // test
+            keyspace -> ReplicaPlans.forRead(keyspace, tk, EACH_QUORUM, 
NeverSpeculativeRetryPolicy.INSTANCE)
+        );
+    }
+
+    @Test
+    public void increaseReplicationFactorShouldNotCausesUnavailableTest() 
throws Throwable
+    {
+        // write
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 1),
+            // alter to
+            KeyspaceParams.nts(DC1, 3),
+            // test
+            keyspace -> ReplicaPlans.forWrite(keyspace, LOCAL_QUORUM, tk, 
ReplicaPlans.writeNormal)
+        );
+
+        // read
+        raceOfReplicationStrategyTest(
+            // init
+            KeyspaceParams.nts(DC1, 1),
+            // alter to
+            KeyspaceParams.nts(DC1, 3),
+            // test
+            keyspace -> ReplicaPlans.forRead(keyspace, tk, LOCAL_QUORUM, 
NeverSpeculativeRetryPolicy.INSTANCE)
+        );
+    }
+
+    /**
+     * A test runner that runs the `test` while changing the 
ReplicationStrategy of the raced keyspace.
+     * It loops at most for RACE_TEST_LOOPS time if unable to produce the race 
or any exception.
+     */
+    private static void raceOfReplicationStrategyTest(KeyspaceParams init,
+                                                      KeyspaceParams alterTo,
+                                                      int loopCount,
+                                                      Consumer<Keyspace> test) 
throws Throwable
+    {
+        String keyspaceName = keyspaceNameGen.get();
+        KeyspaceMetadata initKsMeta = KeyspaceMetadata.create(keyspaceName, 
init, Tables.of(SchemaLoader.standardCFMD("Foo", "Bar").build()));
+        KeyspaceMetadata alterToKsMeta = initKsMeta.withSwapped(alterTo);
+        MigrationManager.announceNewKeyspace(initKsMeta, true);
+        Keyspace racedKs = Keyspace.open(keyspaceName);
+        ExecutorService es = Executors.newFixedThreadPool(2);
+        try (AutoCloseable ignore = () -> {
+            es.shutdown();
+            es.awaitTermination(1, TimeUnit.MINUTES);
+        })
+        {
+            for (int i = 0; i < loopCount; i++)
+            {
+                // reset the keyspace
+                racedKs.setMetadata(initKsMeta);
+                CountDownLatch trigger = new CountDownLatch(1);
+                // starts 2 runnables that could race
+                Future<?> f1 = es.submit(() -> {
+                    Uninterruptibles.awaitUninterruptibly(trigger);
+                    // Update replication strategy
+                    racedKs.setMetadata(alterToKsMeta);
+                });
+                Future<?> f2 = es.submit(() -> {
+                    Uninterruptibles.awaitUninterruptibly(trigger);
+                    test.accept(racedKs);
+                });
+                trigger.countDown();
+                FBUtilities.waitOnFutures(Arrays.asList(f1, f2));
+            }
+        }
+        catch (RuntimeException rte)
+        {
+            // extract out the root cause wrapped by `waitOnFutures` and 
`future.get()`, and rethrow
+            if (rte.getCause() != null
+                && rte.getCause() instanceof ExecutionException
+                && rte.getCause().getCause() != null)
+                throw rte.getCause().getCause();
+            else
+                throw rte;
+        }
+    }
+
+    private static void raceOfReplicationStrategyTest(KeyspaceParams init,
+                                                      KeyspaceParams alterTo,
+                                                      Consumer<Keyspace> test) 
throws Throwable
+    {
+        raceOfReplicationStrategyTest(init, alterTo, RACE_TEST_LOOPS, test);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index 15fbd27..19ed66d 100644
--- 
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ 
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -151,7 +151,7 @@ public class WriteResponseHandlerTransientTest
     {
         EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), 
full(EP1), full(EP2), trans(EP3), full(EP5));
         EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), 
full(EP4), trans(EP6));
-        ReplicaLayout.ForTokenWrite layout = new 
ReplicaLayout.ForTokenWrite(natural, pending);
+        ReplicaLayout.ForTokenWrite layout = new 
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural, pending);
         ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks, 
ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll);
 
         
Assert.assertTrue(Iterables.elementsEqual(EndpointsForRange.of(full(EP4), 
trans(EP6)),
@@ -160,13 +160,13 @@ public class WriteResponseHandlerTransientTest
 
     private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken 
natural, EndpointsForToken selected)
     {
-        return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM, 
EndpointsForToken.empty(dummy.getToken()), natural, natural, selected);
+        return new ReplicaPlan.ForTokenWrite(ks, ks.getReplicationStrategy(), 
ConsistencyLevel.QUORUM, EndpointsForToken.empty(dummy.getToken()), natural, 
natural, selected);
     }
 
     private static ReplicaPlan.ForTokenWrite 
getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort> 
livePredicate)
     {
-        ReplicaLayout.ForTokenWrite liveAndDown = new 
ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken()));
-        ReplicaLayout.ForTokenWrite live = new 
ReplicaLayout.ForTokenWrite(natural.filter(r -> 
livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
+        ReplicaLayout.ForTokenWrite liveAndDown = new 
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural, 
EndpointsForToken.empty(dummy.getToken()));
+        ReplicaLayout.ForTokenWrite live = new 
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural.filter(r -> 
livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
         return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown, 
live, ReplicaPlans.writeNormal);
     }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 36d792e..faae913 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -1321,7 +1321,7 @@ public class DataResolverTest extends 
AbstractReadResponseTest
 
     private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, 
ConsistencyLevel consistencyLevel)
     {
-        return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, 
consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas, 1));
+        return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, 
ks.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS, 
replicas, replicas, 1));
     }
 
     private static void resolveAndConsume(DataResolver resolver)
@@ -1338,4 +1338,4 @@ public class DataResolverTest extends 
AbstractReadResponseTest
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java 
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 99101f1..4dee52a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -161,6 +161,6 @@ public class DigestResolverTest extends 
AbstractReadResponseTest
 
     private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel 
consistencyLevel, EndpointsForToken replicas)
     {
-        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
consistencyLevel, replicas, replicas));
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, 
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas));
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java 
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 3eb9b2e..6fc8fbf 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -270,6 +270,6 @@ public class ReadExecutorTest
 
     private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, 
EndpointsForToken natural, EndpointsForToken selected)
     {
-        return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural, 
selected);
+        return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(), 
consistencyLevel, natural, selected);
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 5315060..badcd35 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -25,15 +25,12 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.EndpointsForToken;
@@ -299,7 +296,12 @@ public abstract  class AbstractReadRepairTest
     {
         Token token = readPlan.range().left.getToken();
         EndpointsForToken pending = EndpointsForToken.empty(token);
-        return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, 
liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), 
ReplicaPlans.writeReadRepair(readPlan));
+        return ReplicaPlans.forWrite(readPlan.keyspace(),
+                                     ConsistencyLevel.TWO,
+                                     liveAndDown.forToken(token),
+                                     pending,
+                                     replica -> true,
+                                     ReplicaPlans.writeReadRepair(readPlan));
     }
     static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, 
EndpointsForRange targets)
     {
@@ -311,7 +313,7 @@ public abstract  class AbstractReadRepairTest
     }
     static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, 
ConsistencyLevel consistencyLevel, EndpointsForRange replicas, 
EndpointsForRange targets)
     {
-        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, 
ReplicaUtils.FULL_BOUNDS, replicas, targets, 1);
+        return new ReplicaPlan.ForRangeRead(keyspace, 
keyspace.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS, 
replicas, targets, 1);
     }
 
     public abstract InstrumentedReadRepair 
createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> 
replicaPlan, long queryStartNanoTime);
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index e4b3a71..8562db7 100644
--- 
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ 
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -118,11 +119,12 @@ public class BlockingReadRepairTest extends 
AbstractReadRepairTest
     @Test
     public void consistencyLevelTest() throws Exception
     {
-        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
ks));
-        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
ks));
-        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
-        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
-        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+        AbstractReplicationStrategy rs = ks.getReplicationStrategy();
+        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
rs));
+        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
rs));
+        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
+        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
+        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
     }
 
 
diff --git 
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java 
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 11b057f..dad9aa4 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -168,11 +169,12 @@ public class ReadRepairTest
     @Test
     public void consistencyLevelTest() throws Exception
     {
-        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
ks));
-        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
ks));
-        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
-        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
-        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+        AbstractReplicationStrategy rs = ks.getReplicationStrategy();
+        
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, 
rs));
+        
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, 
rs));
+        
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
+        
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
+        
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
     }
 
     private static void assertMutationEqual(Mutation expected, Mutation actual)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to