This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b915688 Fix false unavailable for queries due to cluster topology
changes
b915688 is described below
commit b915688ea878aaa284f5cedeb799c5f797c4d824
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Apr 14 13:05:55 2021 -0700
Fix false unavailable for queries due to cluster topology changes
patch by Yifan Cai; reviewed by Aleksey Yeschenko, Andres de la Peña for
CASSANDRA-16545
---
CHANGES.txt | 1 +
.../apache/cassandra/batchlog/BatchlogManager.java | 4 +-
.../cql3/statements/ModificationStatement.java | 4 +-
.../cassandra/cql3/statements/SelectStatement.java | 2 +-
.../org/apache/cassandra/db/ConsistencyLevel.java | 71 +++--
.../org/apache/cassandra/db/CounterMutation.java | 6 +-
.../org/apache/cassandra/db/view/ViewUtils.java | 4 +-
.../apache/cassandra/locator/ReplicaLayout.java | 56 ++--
.../org/apache/cassandra/locator/ReplicaPlan.java | 43 +--
.../org/apache/cassandra/locator/ReplicaPlans.java | 112 ++++----
.../apache/cassandra/locator/TokenMetadata.java | 3 +-
.../DatacenterSyncWriteResponseHandler.java | 6 +-
.../org/apache/cassandra/service/StorageProxy.java | 51 ++--
.../service/reads/ReplicaFilteringProtection.java | 6 +-
.../service/reads/repair/AbstractReadRepair.java | 2 +-
.../service/reads/repair/BlockingReadRepairs.java | 2 +-
.../apache/cassandra/db/view/ViewUtilsTest.java | 6 +-
.../locator/AssureSufficientLiveNodesTest.java | 303 +++++++++++++++++++++
.../service/WriteResponseHandlerTransientTest.java | 8 +-
.../cassandra/service/reads/DataResolverTest.java | 4 +-
.../service/reads/DigestResolverTest.java | 2 +-
.../cassandra/service/reads/ReadExecutorTest.java | 2 +-
.../reads/repair/AbstractReadRepairTest.java | 12 +-
.../reads/repair/BlockingReadRepairTest.java | 12 +-
.../service/reads/repair/ReadRepairTest.java | 12 +-
25 files changed, 537 insertions(+), 197 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8551edf..efd6c60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-rc1
+ * Fix false unavailable for queries due to cluster topology changes
(CASSANDRA-16545)
* Fixed a race condition issue in nodetool repair where we poll for the error
before seeing the error notification, leading to a less meaningful message
(CASSANDRA-16585)
* Fix mixed cluster GROUP BY queries (CASSANDRA-16582)
* Upgrade jflex to 1.8.2 (CASSANDRA-16576)
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 9a009dc..65ed71e 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -489,8 +489,8 @@ public class BatchlogManager implements BatchlogManagerMBean
Hint.create(mutation, writtenAt));
}
- ReplicaPlan.ForTokenWrite replicaPlan = new
ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
- liveRemoteOnly.pending(), liveRemoteOnly.all(),
liveRemoteOnly.all(), liveRemoteOnly.all());
+ ReplicaPlan.ForTokenWrite replicaPlan = new
ReplicaPlan.ForTokenWrite(keyspace, liveAndDown.replicationStrategy(),
+ ConsistencyLevel.ONE, liveRemoteOnly.pending(),
liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
ReplayWriteResponseHandler<Mutation> handler = new
ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
Message<Mutation> message = Message.outWithFlag(MUTATION_REQ,
mutation, MessageFlag.CALL_BACK_ON_FAILURE);
for (Replica replica : liveRemoteOnly.all())
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 0ba105c..785e6bd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -383,7 +383,7 @@ public abstract class ModificationStatement implements
CQLStatement
try
{
- cl.validateForRead(keyspace());
+ cl.validateForRead();
}
catch (InvalidRequestException e)
{
@@ -463,7 +463,7 @@ public abstract class ModificationStatement implements
CQLStatement
if (isCounter())
cl.validateCounterForWrite(metadata());
else
- cl.validateForWrite(metadata.keyspace);
+ cl.validateForWrite();
List<? extends IMutation> mutations =
getMutations(options,
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 8e0df45..63f33b1 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -232,7 +232,7 @@ public class SelectStatement implements CQLStatement
ConsistencyLevel cl = options.getConsistency();
checkNotNull(cl, "Invalid empty consistency level");
- cl.validateForRead(keyspace());
+ cl.validateForRead();
int nowInSec = options.getNowInSeconds(state);
int userLimit = getLimit(options);
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index e3da5b3..fbaf3fd 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -81,50 +81,49 @@ public enum ConsistencyLevel
return codeIdx[code];
}
- public static int quorumFor(Keyspace keyspace)
+ public static int quorumFor(AbstractReplicationStrategy
replicationStrategy)
{
- return
(keyspace.getReplicationStrategy().getReplicationFactor().allReplicas / 2) + 1;
+ return (replicationStrategy.getReplicationFactor().allReplicas / 2) +
1;
}
- public static int localQuorumFor(Keyspace keyspace, String dc)
+ public static int localQuorumFor(AbstractReplicationStrategy
replicationStrategy, String dc)
{
- return (keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy)
- ? (((NetworkTopologyStrategy)
keyspace.getReplicationStrategy()).getReplicationFactor(dc).allReplicas / 2) + 1
- : quorumFor(keyspace);
+ return (replicationStrategy instanceof NetworkTopologyStrategy)
+ ? (((NetworkTopologyStrategy)
replicationStrategy).getReplicationFactor(dc).allReplicas / 2) + 1
+ : quorumFor(replicationStrategy);
}
- public static int localQuorumForOurDc(Keyspace keyspace)
+ public static int localQuorumForOurDc(AbstractReplicationStrategy
replicationStrategy)
{
- return localQuorumFor(keyspace,
DatabaseDescriptor.getLocalDataCenter());
+ return localQuorumFor(replicationStrategy,
DatabaseDescriptor.getLocalDataCenter());
}
- public static ObjectIntHashMap<String> eachQuorumForRead(Keyspace keyspace)
+ public static ObjectIntHashMap<String>
eachQuorumForRead(AbstractReplicationStrategy replicationStrategy)
{
- AbstractReplicationStrategy strategy =
keyspace.getReplicationStrategy();
- if (strategy instanceof NetworkTopologyStrategy)
+ if (replicationStrategy instanceof NetworkTopologyStrategy)
{
- NetworkTopologyStrategy npStrategy = (NetworkTopologyStrategy)
strategy;
+ NetworkTopologyStrategy npStrategy = (NetworkTopologyStrategy)
replicationStrategy;
ObjectIntHashMap<String> perDc = new
ObjectIntHashMap<>(((npStrategy.getDatacenters().size() + 1) * 4) / 3);
for (String dc : npStrategy.getDatacenters())
- perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
+ perDc.put(dc,
ConsistencyLevel.localQuorumFor(replicationStrategy, dc));
return perDc;
}
else
{
ObjectIntHashMap<String> perDc = new ObjectIntHashMap<>(1);
- perDc.put(DatabaseDescriptor.getLocalDataCenter(),
quorumFor(keyspace));
+ perDc.put(DatabaseDescriptor.getLocalDataCenter(),
quorumFor(replicationStrategy));
return perDc;
}
}
- public static ObjectIntHashMap<String> eachQuorumForWrite(Keyspace
keyspace, Endpoints<?> pendingWithDown)
+ public static ObjectIntHashMap<String>
eachQuorumForWrite(AbstractReplicationStrategy replicationStrategy,
Endpoints<?> pendingWithDown)
{
- ObjectIntHashMap<String> perDc = eachQuorumForRead(keyspace);
+ ObjectIntHashMap<String> perDc =
eachQuorumForRead(replicationStrategy);
addToCountPerDc(perDc, pendingWithDown, 1);
return perDc;
}
- public int blockFor(Keyspace keyspace)
+ public int blockFor(AbstractReplicationStrategy replicationStrategy)
{
switch (this)
{
@@ -139,35 +138,35 @@ public enum ConsistencyLevel
return 3;
case QUORUM:
case SERIAL:
- return quorumFor(keyspace);
+ return quorumFor(replicationStrategy);
case ALL:
- return
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+ return replicationStrategy.getReplicationFactor().allReplicas;
case LOCAL_QUORUM:
case LOCAL_SERIAL:
- return localQuorumForOurDc(keyspace);
+ return localQuorumForOurDc(replicationStrategy);
case EACH_QUORUM:
- if (keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy)
+ if (replicationStrategy instanceof NetworkTopologyStrategy)
{
- NetworkTopologyStrategy strategy =
(NetworkTopologyStrategy) keyspace.getReplicationStrategy();
+ NetworkTopologyStrategy strategy =
(NetworkTopologyStrategy) replicationStrategy;
int n = 0;
for (String dc : strategy.getDatacenters())
- n += localQuorumFor(keyspace, dc);
+ n += localQuorumFor(replicationStrategy, dc);
return n;
}
else
{
- return quorumFor(keyspace);
+ return quorumFor(replicationStrategy);
}
default:
throw new UnsupportedOperationException("Invalid consistency
level: " + toString());
}
}
- public int blockForWrite(Keyspace keyspace, Endpoints<?> pending)
+ public int blockForWrite(AbstractReplicationStrategy replicationStrategy,
Endpoints<?> pending)
{
assert pending != null;
- int blockFor = blockFor(keyspace);
+ int blockFor = blockFor(replicationStrategy);
switch (this)
{
case ANY:
@@ -189,9 +188,9 @@ public enum ConsistencyLevel
* Determine if this consistency level meets or exceeds the consistency
requirements of the given cl for the given keyspace
* WARNING: this is not locality aware; you cannot safely use this with
mixed locality consistency levels (e.g. LOCAL_QUORUM and QUORUM)
*/
- public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
+ public boolean satisfies(ConsistencyLevel other,
AbstractReplicationStrategy replicationStrategy)
{
- return blockFor(keyspace) >= other.blockFor(keyspace);
+ return blockFor(replicationStrategy) >=
other.blockFor(replicationStrategy);
}
public boolean isDatacenterLocal()
@@ -199,7 +198,7 @@ public enum ConsistencyLevel
return isDCLocal;
}
- public void validateForRead(String keyspaceName) throws
InvalidRequestException
+ public void validateForRead() throws InvalidRequestException
{
switch (this)
{
@@ -208,7 +207,7 @@ public enum ConsistencyLevel
}
}
- public void validateForWrite(String keyspaceName) throws
InvalidRequestException
+ public void validateForWrite() throws InvalidRequestException
{
switch (this)
{
@@ -219,12 +218,12 @@ public enum ConsistencyLevel
}
// This is the same than validateForWrite really, but we include a
slightly different error message for SERIAL/LOCAL_SERIAL
- public void validateForCasCommit(String keyspaceName) throws
InvalidRequestException
+ public void validateForCasCommit(AbstractReplicationStrategy
replicationStrategy) throws InvalidRequestException
{
switch (this)
{
case EACH_QUORUM:
- requireNetworkTopologyStrategy(keyspaceName);
+ requireNetworkTopologyStrategy(replicationStrategy);
break;
case SERIAL:
case LOCAL_SERIAL:
@@ -252,10 +251,10 @@ public enum ConsistencyLevel
throw new InvalidRequestException("Counter operations are
inherently non-serializable");
}
- private void requireNetworkTopologyStrategy(String keyspaceName) throws
InvalidRequestException
+ private void requireNetworkTopologyStrategy(AbstractReplicationStrategy
replicationStrategy) throws InvalidRequestException
{
- AbstractReplicationStrategy strategy =
Keyspace.open(keyspaceName).getReplicationStrategy();
- if (!(strategy instanceof NetworkTopologyStrategy))
- throw new InvalidRequestException(String.format("consistency level
%s not compatible with replication strategy (%s)", this,
strategy.getClass().getName()));
+ if (!(replicationStrategy instanceof NetworkTopologyStrategy))
+ throw new InvalidRequestException(String.format("consistency level
%s not compatible with replication strategy (%s)",
+ this,
replicationStrategy.getClass().getName()));
}
}
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java
b/src/java/org/apache/cassandra/db/CounterMutation.java
index bc0cd85..fe1e46e 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.tracing.Tracing;
@@ -151,18 +152,19 @@ public class CounterMutation implements IMutation
{
long startTime = System.nanoTime();
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
for (Lock lock : LOCKS.bulkGet(getCounterLockKeys()))
{
long timeout = getTimeout(NANOSECONDS) - (System.nanoTime() -
startTime);
try
{
if (!lock.tryLock(timeout, NANOSECONDS))
- throw new WriteTimeoutException(WriteType.COUNTER,
consistency(), 0, consistency().blockFor(keyspace));
+ throw new WriteTimeoutException(WriteType.COUNTER,
consistency(), 0, consistency().blockFor(replicationStrategy));
locks.add(lock);
}
catch (InterruptedException e)
{
- throw new WriteTimeoutException(WriteType.COUNTER,
consistency(), 0, consistency().blockFor(keyspace));
+ throw new WriteTimeoutException(WriteType.COUNTER,
consistency(), 0, consistency().blockFor(replicationStrategy));
}
}
}
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java
b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index e824732..b5aa063 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -59,10 +59,8 @@ public final class ViewUtils
*
* @return Optional.empty() if this method is called using a base token
which does not belong to this replica
*/
- public static Optional<Replica> getViewNaturalEndpoint(String
keyspaceName, Token baseToken, Token viewToken)
+ public static Optional<Replica>
getViewNaturalEndpoint(AbstractReplicationStrategy replicationStrategy, Token
baseToken, Token viewToken)
{
- AbstractReplicationStrategy replicationStrategy =
Keyspace.open(keyspaceName).getReplicationStrategy();
-
String localDataCenter =
DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
EndpointsForToken naturalBaseReplicas =
replicationStrategy.getNaturalReplicasForToken(baseToken);
EndpointsForToken naturalViewReplicas =
replicationStrategy.getNaturalReplicasForToken(viewToken);
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index d44fdd7..ff81732 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -39,9 +39,12 @@ import java.util.function.Predicate;
public abstract class ReplicaLayout<E extends Endpoints<E>>
{
private final E natural;
+ // the snapshot of the replication strategy that corresponds to the
replica layout
+ private final AbstractReplicationStrategy replicationStrategy;
- ReplicaLayout(E natural)
+ ReplicaLayout(AbstractReplicationStrategy replicationStrategy, E natural)
{
+ this.replicationStrategy = replicationStrategy;
this.natural = natural;
}
@@ -55,6 +58,11 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
return natural;
}
+ public final AbstractReplicationStrategy replicationStrategy()
+ {
+ return replicationStrategy;
+ }
+
/**
* All relevant owners of the ring position(s) for this operation, as
implied by the current ring layout.
* For writes, this will include pending owners, and for reads it will be
equivalent to natural()
@@ -71,9 +79,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
public static class ForTokenRead extends ReplicaLayout<EndpointsForToken>
implements ForToken
{
- public ForTokenRead(EndpointsForToken natural)
+ public ForTokenRead(AbstractReplicationStrategy replicationStrategy,
EndpointsForToken natural)
{
- super(natural);
+ super(replicationStrategy, natural);
}
@Override
@@ -87,7 +95,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
EndpointsForToken filtered = natural().filter(filter);
// AbstractReplicaCollection.filter returns itself if all elements
match the filter
if (filtered == natural()) return this;
- return new ReplicaLayout.ForTokenRead(filtered);
+ return new ReplicaLayout.ForTokenRead(replicationStrategy(),
filtered);
}
}
@@ -95,9 +103,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
{
final AbstractBounds<PartitionPosition> range;
- public ForRangeRead(AbstractBounds<PartitionPosition> range,
EndpointsForRange natural)
+ public ForRangeRead(AbstractReplicationStrategy replicationStrategy,
AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
{
- super(natural);
+ super(replicationStrategy, natural);
this.range = range;
}
@@ -112,7 +120,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
EndpointsForRange filtered = natural().filter(filter);
// AbstractReplicaCollection.filter returns itself if all elements
match the filter
if (filtered == natural()) return this;
- return new ReplicaLayout.ForRangeRead(range(), filtered);
+ return new ReplicaLayout.ForRangeRead(replicationStrategy(),
range(), filtered);
}
}
@@ -121,9 +129,9 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
final E all;
final E pending;
- ForWrite(E natural, E pending, E all)
+ ForWrite(AbstractReplicationStrategy replicationStrategy, E natural, E
pending, E all)
{
- super(natural);
+ super(replicationStrategy, natural);
assert pending != null && !haveWriteConflicts(natural, pending);
if (all == null)
all = Endpoints.concat(natural, pending);
@@ -149,13 +157,13 @@ public abstract class ReplicaLayout<E extends
Endpoints<E>>
public static class ForTokenWrite extends ForWrite<EndpointsForToken>
implements ForToken
{
- public ForTokenWrite(EndpointsForToken natural, EndpointsForToken
pending)
+ public ForTokenWrite(AbstractReplicationStrategy replicationStrategy,
EndpointsForToken natural, EndpointsForToken pending)
{
- this(natural, pending, null);
+ this(replicationStrategy, natural, pending, null);
}
- public ForTokenWrite(EndpointsForToken natural, EndpointsForToken
pending, EndpointsForToken all)
+ public ForTokenWrite(AbstractReplicationStrategy replicationStrategy,
EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all)
{
- super(natural, pending, all);
+ super(replicationStrategy, natural, pending, all);
}
@Override
@@ -168,6 +176,7 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
if (filtered == all()) return this;
// unique by endpoint, so can for efficiency filter only on
endpoint
return new ReplicaLayout.ForTokenWrite(
+ replicationStrategy(),
natural().keep(filtered.endpoints()),
pending().keep(filtered.endpoints()),
filtered
@@ -196,19 +205,20 @@ public abstract class ReplicaLayout<E extends
Endpoints<E>>
{
// TODO: these should be cached, not the natural replicas
// TODO: race condition to fetch these. implications??
- EndpointsForToken natural =
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
+ EndpointsForToken natural =
replicationStrategy.getNaturalReplicasForToken(token);
EndpointsForToken pending =
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token,
keyspace.getName());
- return forTokenWrite(natural, pending);
+ return forTokenWrite(replicationStrategy, natural, pending);
}
- public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken
natural, EndpointsForToken pending)
+ public static ReplicaLayout.ForTokenWrite
forTokenWrite(AbstractReplicationStrategy replicationStrategy,
EndpointsForToken natural, EndpointsForToken pending)
{
if (haveWriteConflicts(natural, pending))
{
natural = resolveWriteConflictsInNatural(natural, pending);
pending = resolveWriteConflictsInPending(natural, pending);
}
- return new ReplicaLayout.ForTokenWrite(natural, pending);
+ return new ReplicaLayout.ForTokenWrite(replicationStrategy, natural,
pending);
}
/**
@@ -315,12 +325,12 @@ public abstract class ReplicaLayout<E extends
Endpoints<E>>
* @return the read layout for a token - this includes only live natural
replicas, i.e. those that are not pending
* and not marked down by the failure detector. these are reverse sorted
by the badness score of the configured snitch
*/
- static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace
keyspace, Token token)
+ static ReplicaLayout.ForTokenRead
forTokenReadLiveSorted(AbstractReplicationStrategy replicationStrategy, Token
token)
{
- EndpointsForToken replicas =
keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+ EndpointsForToken replicas =
replicationStrategy.getNaturalReplicasForToken(token);
replicas =
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
replicas);
replicas = replicas.filter(FailureDetector.isReplicaAlive);
- return new ReplicaLayout.ForTokenRead(replicas);
+ return new ReplicaLayout.ForTokenRead(replicationStrategy, replicas);
}
/**
@@ -328,12 +338,12 @@ public abstract class ReplicaLayout<E extends
Endpoints<E>>
* @return the read layout for a range - this includes only live natural
replicas, i.e. those that are not pending
* and not marked down by the failure detector. these are reverse sorted
by the badness score of the configured snitch
*/
- static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace
keyspace, AbstractBounds<PartitionPosition> range)
+ static ReplicaLayout.ForRangeRead
forRangeReadLiveSorted(AbstractReplicationStrategy replicationStrategy,
AbstractBounds<PartitionPosition> range)
{
- EndpointsForRange replicas =
keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+ EndpointsForRange replicas =
replicationStrategy.getNaturalReplicas(range.right);
replicas =
DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(),
replicas);
replicas = replicas.filter(FailureDetector.isReplicaAlive);
- return new ReplicaLayout.ForRangeRead(range, replicas);
+ return new ReplicaLayout.ForRangeRead(replicationStrategy, range,
replicas);
}
}
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 407db5b..51cab13 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -30,6 +30,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
{
protected final Keyspace keyspace;
protected final ConsistencyLevel consistencyLevel;
+ // The snapshot of the replication strategy when instantiating.
+ // It could be different than the one fetched from Keyspace later, e.g. RS
altered during the query.
+ // Use the snapshot to calculate {@code blockFor} in order to have a
consistent view of RS for the query.
+ protected final AbstractReplicationStrategy replicationStrategy;
// all nodes we will contact via any mechanism, including hints
// i.e., for:
@@ -41,10 +45,11 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
// ==> live.all() (if consistencyLevel.isDCLocal(), then
.filter(consistencyLevel.isLocal))
private final E contacts;
- ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E
contacts)
+ ReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, E contacts)
{
assert contacts != null;
this.keyspace = keyspace;
+ this.replicationStrategy = replicationStrategy;
this.consistencyLevel = consistencyLevel;
this.contacts = contacts;
}
@@ -56,6 +61,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
// TODO: should this semantically return true if we contain the endpoint,
not the exact replica?
public boolean contacts(Replica replica) { return
contacts.contains(replica); }
public Keyspace keyspace() { return keyspace; }
+ public AbstractReplicationStrategy replicationStrategy() { return
replicationStrategy; }
public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
public static abstract class ForRead<E extends Endpoints<E>> extends
ReplicaPlan<E>
@@ -64,13 +70,13 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
// we will consult this collection to find uncontacted nodes we might
contact if we doubt we will meet consistency level
private final E candidates;
- ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E
candidates, E contact)
+ ForRead(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, E candidates, E
contacts)
{
- super(keyspace, consistencyLevel, contact);
+ super(keyspace, replicationStrategy, consistencyLevel, contacts);
this.candidates = candidates;
}
- public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+ public int blockFor() { return
consistencyLevel.blockFor(replicationStrategy); }
public E candidates() { return candidates; }
@@ -92,14 +98,18 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
public static class ForTokenRead extends ForRead<EndpointsForToken>
{
- public ForTokenRead(Keyspace keyspace, ConsistencyLevel
consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+ public ForTokenRead(Keyspace keyspace,
+ AbstractReplicationStrategy replicationStrategy,
+ ConsistencyLevel consistencyLevel,
+ EndpointsForToken candidates,
+ EndpointsForToken contacts)
{
- super(keyspace, consistencyLevel, candidates, contact);
+ super(keyspace, replicationStrategy, consistencyLevel, candidates,
contacts);
}
ForTokenRead withContact(EndpointsForToken newContact)
{
- return new ForTokenRead(keyspace, consistencyLevel, candidates(),
newContact);
+ return new ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates(), newContact);
}
}
@@ -109,13 +119,14 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
final int vnodeCount;
public ForRangeRead(Keyspace keyspace,
+ AbstractReplicationStrategy replicationStrategy,
ConsistencyLevel consistencyLevel,
AbstractBounds<PartitionPosition> range,
EndpointsForRange candidates,
EndpointsForRange contact,
int vnodeCount)
{
- super(keyspace, consistencyLevel, candidates, contact);
+ super(keyspace, replicationStrategy, consistencyLevel, candidates,
contact);
this.range = range;
this.vnodeCount = vnodeCount;
}
@@ -129,7 +140,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
ForRangeRead withContact(EndpointsForRange newContact)
{
- return new ForRangeRead(keyspace, consistencyLevel, range,
candidates(), newContact, vnodeCount);
+ return new ForRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, candidates(), newContact, vnodeCount);
}
}
@@ -140,15 +151,15 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
final E liveAndDown;
final E live;
- ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E
pending, E liveAndDown, E live, E contact)
+ ForWrite(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, E pending, E
liveAndDown, E live, E contact)
{
- super(keyspace, consistencyLevel, contact);
+ super(keyspace, replicationStrategy, consistencyLevel, contact);
this.pending = pending;
this.liveAndDown = liveAndDown;
this.live = live;
}
- public int blockFor() { return
consistencyLevel.blockForWrite(keyspace, pending()); }
+ public int blockFor() { return
consistencyLevel.blockForWrite(replicationStrategy, pending()); }
/** Replicas that a region of the ring is moving to; not yet ready to
serve reads, but should receive writes */
public E pending() { return pending; }
@@ -173,14 +184,14 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
public static class ForTokenWrite extends ForWrite<EndpointsForToken>
{
- public ForTokenWrite(Keyspace keyspace, ConsistencyLevel
consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown,
EndpointsForToken live, EndpointsForToken contact)
+ public ForTokenWrite(Keyspace keyspace, AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, EndpointsForToken
pending, EndpointsForToken liveAndDown, EndpointsForToken live,
EndpointsForToken contact)
{
- super(keyspace, consistencyLevel, pending, liveAndDown, live,
contact);
+ super(keyspace, replicationStrategy, consistencyLevel, pending,
liveAndDown, live, contact);
}
private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel
newConsistencyLevel, EndpointsForToken newContact)
{
- return new ReplicaPlan.ForTokenWrite(keyspace,
newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+ return new ReplicaPlan.ForTokenWrite(keyspace,
replicationStrategy, newConsistencyLevel, pending(), liveAndDown(), live(),
newContact);
}
ForTokenWrite withConsistencyLevel(ConsistencyLevel
newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
@@ -193,7 +204,7 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel,
EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken
live, EndpointsForToken contact, int requiredParticipants)
{
- super(keyspace, consistencyLevel, pending, liveAndDown, live,
contact);
+ super(keyspace, keyspace.getReplicationStrategy(),
consistencyLevel, pending, liveAndDown, live, contact);
this.requiredParticipants = requiredParticipants;
}
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 083da7a..67b89e5 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -72,7 +72,7 @@ public class ReplicaPlans
{
private static final Logger logger =
LoggerFactory.getLogger(ReplicaPlans.class);
- public static boolean isSufficientLiveReplicasForRead(Keyspace keyspace,
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas)
+ public static boolean
isSufficientLiveReplicasForRead(AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?>
liveReplicas)
{
switch (consistencyLevel)
{
@@ -82,16 +82,16 @@ public class ReplicaPlans
case LOCAL_ONE:
return countInOurDc(liveReplicas).hasAtleast(1, 1);
case LOCAL_QUORUM:
- return
countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace), 1);
+ return
countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(replicationStrategy),
1);
case EACH_QUORUM:
- if (keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy)
+ if (replicationStrategy instanceof NetworkTopologyStrategy)
{
int fullCount = 0;
- Collection<String> dcs = ((NetworkTopologyStrategy)
keyspace.getReplicationStrategy()).getDatacenters();
+ Collection<String> dcs = ((NetworkTopologyStrategy)
replicationStrategy).getDatacenters();
for (ObjectObjectCursor<String, Replicas.ReplicaCount>
entry : countPerDc(dcs, liveReplicas))
{
Replicas.ReplicaCount count = entry.value;
- if (!count.hasAtleast(localQuorumFor(keyspace,
entry.key), 0))
+ if
(!count.hasAtleast(localQuorumFor(replicationStrategy, entry.key), 0))
return false;
fullCount += count.fullReplicas();
}
@@ -99,20 +99,20 @@ public class ReplicaPlans
}
// Fallthough on purpose for SimpleStrategy
default:
- return liveReplicas.size() >=
consistencyLevel.blockFor(keyspace)
+ return liveReplicas.size() >=
consistencyLevel.blockFor(replicationStrategy)
&& Replicas.countFull(liveReplicas) > 0;
}
}
- static void assureSufficientLiveReplicasForRead(Keyspace keyspace,
ConsistencyLevel consistencyLevel, Endpoints<?> liveReplicas) throws
UnavailableException
+ static void
assureSufficientLiveReplicasForRead(AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?>
liveReplicas) throws UnavailableException
{
- assureSufficientLiveReplicas(keyspace, consistencyLevel, liveReplicas,
consistencyLevel.blockFor(keyspace), 1);
+ assureSufficientLiveReplicas(replicationStrategy, consistencyLevel,
liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1);
}
- static void assureSufficientLiveReplicasForWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?>
pendingWithDown) throws UnavailableException
+ static void
assureSufficientLiveReplicasForWrite(AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive,
Endpoints<?> pendingWithDown) throws UnavailableException
{
- assureSufficientLiveReplicas(keyspace, consistencyLevel, allLive,
consistencyLevel.blockForWrite(keyspace, pendingWithDown), 0);
+ assureSufficientLiveReplicas(replicationStrategy, consistencyLevel,
allLive, consistencyLevel.blockForWrite(replicationStrategy, pendingWithDown),
0);
}
- static void assureSufficientLiveReplicas(Keyspace keyspace,
ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int
blockForFullReplicas) throws UnavailableException
+ static void assureSufficientLiveReplicas(AbstractReplicationStrategy
replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive,
int blockFor, int blockForFullReplicas) throws UnavailableException
{
switch (consistencyLevel)
{
@@ -141,14 +141,14 @@ public class ReplicaPlans
break;
}
case EACH_QUORUM:
- if (keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy)
+ if (replicationStrategy instanceof NetworkTopologyStrategy)
{
int total = 0;
int totalFull = 0;
- Collection<String> dcs = ((NetworkTopologyStrategy)
keyspace.getReplicationStrategy()).getDatacenters();
+ Collection<String> dcs = ((NetworkTopologyStrategy)
replicationStrategy).getDatacenters();
for (ObjectObjectCursor<String, Replicas.ReplicaCount>
entry : countPerDc(dcs, allLive))
{
- int dcBlockFor = localQuorumFor(keyspace, entry.key);
+ int dcBlockFor = localQuorumFor(replicationStrategy,
entry.key);
Replicas.ReplicaCount dcCount = entry.value;
if (!dcCount.hasAtleast(dcBlockFor, 0))
throw
UnavailableException.create(consistencyLevel, entry.key, dcBlockFor,
dcCount.allReplicas(), 0, dcCount.fullReplicas());
@@ -180,7 +180,7 @@ public class ReplicaPlans
{
EndpointsForToken one = EndpointsForToken.of(token, replica);
EndpointsForToken empty = EndpointsForToken.empty(token);
- return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
empty, one, one, one);
+ return new ReplicaPlan.ForTokenWrite(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, empty, one, one, one);
}
/**
@@ -199,10 +199,10 @@ public class ReplicaPlans
Replica localSystemReplica =
SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort());
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+ systemKeypsace.getReplicationStrategy(),
EndpointsForToken.of(token, localSystemReplica),
EndpointsForToken.empty(token)
);
-
return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown,
liveAndDown, writeAll);
}
@@ -231,16 +231,14 @@ public class ReplicaPlans
if (chosenEndpoints.isEmpty() && isAny)
chosenEndpoints =
Collections.singleton(FBUtilities.getBroadcastAddressAndPort());
+ Keyspace systemKeypsace =
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+ systemKeypsace.getReplicationStrategy(),
SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token),
EndpointsForToken.empty(token)
);
-
// Batchlog is hosted by either one node or two nodes from different
racks.
ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ?
ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
- Keyspace systemKeypsace =
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-
// assume that we have already been given live endpoints, and skip
applying the failure detector
return forWrite(systemKeypsace, consistencyLevel, liveAndDown,
liveAndDown, writeAll);
}
@@ -333,7 +331,7 @@ public class ReplicaPlans
@VisibleForTesting
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken
pending, Predicate<Replica> isAlive, Selector selector) throws
UnavailableException
{
- return forWrite(keyspace, consistencyLevel,
ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+ return forWrite(keyspace, consistencyLevel,
ReplicaLayout.forTokenWrite(keyspace.getReplicationStrategy(), natural,
pending), isAlive, selector);
}
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown,
Selector selector) throws UnavailableException
@@ -341,8 +339,7 @@ public class ReplicaPlans
return forWrite(keyspace, consistencyLevel, liveAndDown,
FailureDetector.isReplicaAlive, selector);
}
- @VisibleForTesting
- public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown,
Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+ private static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown,
Predicate<Replica> isAlive, Selector selector) throws UnavailableException
{
ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
return forWrite(keyspace, consistencyLevel, liveAndDown, live,
selector);
@@ -350,15 +347,21 @@ public class ReplicaPlans
public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace,
ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown,
ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
{
- EndpointsForToken contacts = selector.select(keyspace,
consistencyLevel, liveAndDown, live);
- assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel,
live.all(), liveAndDown.pending());
- return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel,
liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+ assert liveAndDown.replicationStrategy() == live.replicationStrategy()
+ : "ReplicaLayout liveAndDown and live should be derived from
the same replication strategy.";
+ AbstractReplicationStrategy replicationStrategy =
liveAndDown.replicationStrategy();
+ EndpointsForToken contacts = selector.select(consistencyLevel,
liveAndDown, live);
+ assureSufficientLiveReplicasForWrite(replicationStrategy,
consistencyLevel, live.all(), liveAndDown.pending());
+ return new ReplicaPlan.ForTokenWrite(keyspace, replicationStrategy,
consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(),
contacts);
}
public interface Selector
{
+ /**
+ * Select the {@code Endpoints} from {@param liveAndDown} and {@param
live} to contact according to the consistency level.
+ */
<E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L
liveAndDown, L live);
+ E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live);
}
/**
@@ -371,7 +374,7 @@ public class ReplicaPlans
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L
liveAndDown, L live)
+ E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
return liveAndDown.all();
}
@@ -390,7 +393,7 @@ public class ReplicaPlans
{
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L
liveAndDown, L live)
+ E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
if (!any(liveAndDown.all(), Replica::isTransient))
return liveAndDown.all();
@@ -406,7 +409,7 @@ public class ReplicaPlans
* soft-ensure that we reach QUORUM in all DCs we are able to, by
writing to every node;
* even if we don't wait for ACK, we have in both cases sent
sufficient messages.
*/
- ObjectIntHashMap<String> requiredPerDc =
eachQuorumForWrite(keyspace, liveAndDown.pending());
+ ObjectIntHashMap<String> requiredPerDc =
eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
addToCountPerDc(requiredPerDc,
live.natural().filter(Replica::isFull), -1);
addToCountPerDc(requiredPerDc, live.pending(), -1);
@@ -438,7 +441,7 @@ public class ReplicaPlans
{
@Override
public <E extends Endpoints<E>, L extends
ReplicaLayout.ForWrite<E>>
- E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L
liveAndDown, L live)
+ E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
{
assert !any(liveAndDown.all(), Replica::isTransient);
@@ -449,7 +452,7 @@ public class ReplicaPlans
// finally, add sufficient nodes to achieve our consistency
level
if (consistencyLevel != EACH_QUORUM)
{
- int add = consistencyLevel.blockForWrite(keyspace,
liveAndDown.pending()) - contacts.size();
+ int add =
consistencyLevel.blockForWrite(liveAndDown.replicationStrategy(),
liveAndDown.pending()) - contacts.size();
if (add > 0)
{
for (Replica replica : filter(live.all(), r ->
!contacts.contains(r)))
@@ -462,7 +465,7 @@ public class ReplicaPlans
}
else
{
- ObjectIntHashMap<String> requiredPerDc =
eachQuorumForWrite(keyspace, liveAndDown.pending());
+ ObjectIntHashMap<String> requiredPerDc =
eachQuorumForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending());
addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
IEndpointSnitch snitch =
DatabaseDescriptor.getEndpointSnitch();
for (Replica replica : filter(live.all(), r ->
!contacts.contains(r)))
@@ -486,6 +489,7 @@ public class ReplicaPlans
public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace,
DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws
UnavailableException
{
Token tk = key.getToken();
+
ReplicaLayout.ForTokenWrite liveAndDown =
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
Replicas.temporaryAssertFull(liveAndDown.all()); // TODO
CASSANDRA-14547
@@ -527,10 +531,9 @@ public class ReplicaPlans
: liveNaturalReplicas;
}
- private static <E extends Endpoints<E>> E
contactForEachQuorumRead(Keyspace keyspace, E candidates)
+ private static <E extends Endpoints<E>> E
contactForEachQuorumRead(NetworkTopologyStrategy replicationStrategy, E
candidates)
{
- assert keyspace.getReplicationStrategy() instanceof
NetworkTopologyStrategy;
- ObjectIntHashMap<String> perDc = eachQuorumForRead(keyspace);
+ ObjectIntHashMap<String> perDc =
eachQuorumForRead(replicationStrategy);
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
return candidates.filter(replica -> {
@@ -539,7 +542,7 @@ public class ReplicaPlans
});
}
- private static <E extends Endpoints<E>> E contactForRead(Keyspace
keyspace, ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E
candidates)
+ private static <E extends Endpoints<E>> E
contactForRead(AbstractReplicationStrategy replicationStrategy,
ConsistencyLevel consistencyLevel, boolean alwaysSpeculate, E candidates)
{
/*
* If we are doing an each quorum query, we have to make sure that the
endpoints we select
@@ -550,10 +553,10 @@ public class ReplicaPlans
*
* TODO: this is still very inconistently managed between
{LOCAL,EACH}_QUORUM and other consistency levels - should address this in a
follow-up
*/
- if (consistencyLevel == EACH_QUORUM &&
keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
- return contactForEachQuorumRead(keyspace, candidates);
+ if (consistencyLevel == EACH_QUORUM && replicationStrategy instanceof
NetworkTopologyStrategy)
+ return contactForEachQuorumRead((NetworkTopologyStrategy)
replicationStrategy, candidates);
- int count = consistencyLevel.blockFor(keyspace) + (alwaysSpeculate ? 1
: 0);
+ int count = consistencyLevel.blockFor(replicationStrategy) +
(alwaysSpeculate ? 1 : 0);
return candidates.subList(0, Math.min(count, candidates.size()));
}
@@ -564,7 +567,7 @@ public class ReplicaPlans
public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace
keyspace, Token token, Replica replica)
{
EndpointsForToken one = EndpointsForToken.of(token, replica);
- return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE,
one, one);
+ return new ReplicaPlan.ForTokenRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one);
}
/**
@@ -574,7 +577,7 @@ public class ReplicaPlans
{
// TODO: this is unsafe, as one.range() may be inconsistent with our
supplied range; should refactor Range/AbstractBounds to single class
EndpointsForRange one = EndpointsForRange.of(replica);
- return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE,
range, one, one, vnodeCount);
+ return new ReplicaPlan.ForRangeRead(keyspace,
keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one,
vnodeCount);
}
/**
@@ -587,11 +590,12 @@ public class ReplicaPlans
*/
public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token
token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
{
- EndpointsForToken candidates = candidatesForRead(consistencyLevel,
ReplicaLayout.forTokenReadLiveSorted(keyspace, token).natural());
- EndpointsForToken contacts = contactForRead(keyspace,
consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE),
candidates);
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
+ EndpointsForToken candidates = candidatesForRead(consistencyLevel,
ReplicaLayout.forTokenReadLiveSorted(replicationStrategy, token).natural());
+ EndpointsForToken contacts = contactForRead(replicationStrategy,
consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE),
candidates);
- assureSufficientLiveReplicasForRead(keyspace, consistencyLevel,
contacts);
- return new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel,
candidates, contacts);
+ assureSufficientLiveReplicasForRead(replicationStrategy,
consistencyLevel, contacts);
+ return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy,
consistencyLevel, candidates, contacts);
}
/**
@@ -603,11 +607,12 @@ public class ReplicaPlans
*/
public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace,
ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int
vnodeCount)
{
- EndpointsForRange candidates = candidatesForRead(consistencyLevel,
ReplicaLayout.forRangeReadLiveSorted(keyspace, range).natural());
- EndpointsForRange contacts = contactForRead(keyspace,
consistencyLevel, false, candidates);
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
+ EndpointsForRange candidates = candidatesForRead(consistencyLevel,
ReplicaLayout.forRangeReadLiveSorted(replicationStrategy, range).natural());
+ EndpointsForRange contacts = contactForRead(replicationStrategy,
consistencyLevel, false, candidates);
- assureSufficientLiveReplicasForRead(keyspace, consistencyLevel,
contacts);
- return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range,
candidates, contacts, vnodeCount);
+ assureSufficientLiveReplicasForRead(replicationStrategy,
consistencyLevel, contacts);
+ return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy,
consistencyLevel, range, candidates, contacts, vnodeCount);
}
/**
@@ -618,18 +623,19 @@ public class ReplicaPlans
// TODO: should we be asserting that the ranges are adjacent?
AbstractBounds<PartitionPosition> newRange =
left.range().withNewRight(right.range().right);
EndpointsForRange mergedCandidates =
left.candidates().keep(right.candidates().endpoints());
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
// Check if there are enough shared endpoints for the merge to be
possible.
- if (!isSufficientLiveReplicasForRead(keyspace, consistencyLevel,
mergedCandidates))
+ if (!isSufficientLiveReplicasForRead(replicationStrategy,
consistencyLevel, mergedCandidates))
return null;
- EndpointsForRange contacts = contactForRead(keyspace,
consistencyLevel, false, mergedCandidates);
+ EndpointsForRange contacts = contactForRead(replicationStrategy,
consistencyLevel, false, mergedCandidates);
// Estimate whether merging will be a win or not
if
(!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts,
left.contacts(), right.contacts()))
return null;
// If we get there, merge this range and the next one
- return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
newRange, mergedCandidates, contacts, left.vnodeCount() + right.vnodeCount());
+ return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy,
consistencyLevel, newRange, mergedCandidates, contacts, left.vnodeCount() +
right.vnodeCount());
}
}
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 589a259..f2bbb9f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -1294,7 +1295,7 @@ public class TokenMetadata
public EndpointsForToken getWriteEndpoints(Token token, String
keyspaceName, EndpointsForToken natural)
{
EndpointsForToken pending = pendingEndpointsForToken(token,
keyspaceName);
- return ReplicaLayout.forTokenWrite(natural, pending).all();
+ return
ReplicaLayout.forTokenWrite(Keyspace.open(keyspaceName).getReplicationStrategy(),
natural, pending).all();
}
/** @return an endpoint to token multimap representation of
tokenToEndpointMap (a copy) */
diff --git
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 389dcd5..65cf3cc 100644
---
a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -49,9 +49,9 @@ public class DatacenterSyncWriteResponseHandler<T> extends
AbstractWriteResponse
super(replicaPlan, callback, writeType, queryStartNanoTime);
assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
- if (replicaPlan.keyspace().getReplicationStrategy() instanceof
NetworkTopologyStrategy)
+ if (replicaPlan.replicationStrategy() instanceof
NetworkTopologyStrategy)
{
- NetworkTopologyStrategy strategy = (NetworkTopologyStrategy)
replicaPlan.keyspace().getReplicationStrategy();
+ NetworkTopologyStrategy strategy = (NetworkTopologyStrategy)
replicaPlan.replicationStrategy();
for (String dc : strategy.getDatacenters())
{
int rf = strategy.getReplicationFactor(dc).allReplicas;
@@ -60,7 +60,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends
AbstractWriteResponse
}
else
{
- responses.put(DatabaseDescriptor.getLocalDataCenter(), new
AtomicInteger(ConsistencyLevel.quorumFor(replicaPlan.keyspace())));
+ responses.put(DatabaseDescriptor.getLocalDataCenter(), new
AtomicInteger(ConsistencyLevel.quorumFor(replicaPlan.replicationStrategy())));
}
// During bootstrap, we have to include the pending endpoints or we
may fail the consistency level
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index b7b9f2c..72801a9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -431,17 +431,19 @@ public class StorageProxy implements StorageProxyMBean
{
int contentions = 0;
Keyspace keyspace = Keyspace.open(metadata.keyspace);
+ AbstractReplicationStrategy latestRs =
keyspace.getReplicationStrategy();
try
{
consistencyForPaxos.validateForCas();
-
consistencyForReplayCommits.validateForCasCommit(metadata.keyspace);
- consistencyForCommit.validateForCasCommit(metadata.keyspace);
+ consistencyForReplayCommits.validateForCasCommit(latestRs);
+ consistencyForCommit.validateForCasCommit(latestRs);
long timeoutNanos =
DatabaseDescriptor.getCasContentionTimeout(NANOSECONDS);
while (System.nanoTime() - queryStartNanoTime < timeoutNanos)
{
// for simplicity, we'll do a single liveness check at the
start of each attempt
ReplicaPlan.ForPaxosWrite replicaPlan =
ReplicaPlans.forPaxos(keyspace, key, consistencyForPaxos);
+ latestRs = replicaPlan.replicationStrategy();
PaxosBallotAndContention pair =
beginAndRepairPaxos(queryStartNanoTime,
key,
metadata,
@@ -501,7 +503,7 @@ public class StorageProxy implements StorageProxyMBean
recordCasContention(metadata, key, casMetrics, contentions);
}
- throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos,
0, consistencyForPaxos.blockFor(keyspace), contentions);
+ throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos,
0, consistencyForPaxos.blockFor(latestRs), contentions);
}
/**
@@ -617,7 +619,7 @@ public class StorageProxy implements StorageProxyMBean
}
}
- throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos,
0, consistencyForPaxos.blockFor(Keyspace.open(metadata.keyspace)), contentions);
+ throw new CasWriteTimeoutException(WriteType.CAS, consistencyForPaxos,
0, consistencyForPaxos.blockFor(paxosPlan.replicationStrategy()), contentions);
}
/**
@@ -713,7 +715,7 @@ public class StorageProxy implements StorageProxyMBean
ReplicaPlan.ForTokenWrite replicaPlan =
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
if (shouldBlock)
{
- AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+ AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
responseHandler = rs.getWriteResponseHandler(replicaPlan, null,
WriteType.SIMPLE, queryStartNanoTime);
}
@@ -967,7 +969,8 @@ public class StorageProxy implements StorageProxyMBean
{
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
- Optional<Replica> pairedEndpoint =
ViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk);
+ AbstractReplicationStrategy replicationStrategy =
Keyspace.open(keyspaceName).getReplicationStrategy();
+ Optional<Replica> pairedEndpoint =
ViewUtils.getViewNaturalEndpoint(replicationStrategy, baseToken, tk);
EndpointsForToken pendingReplicas =
StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk,
keyspaceName);
// if there are no paired endpoints there are probably
range movements going on, so we write to the local batchlog to replay later
@@ -1002,11 +1005,13 @@ public class StorageProxy implements StorageProxyMBean
}
else
{
+ ReplicaLayout.ForTokenWrite liveAndDown =
ReplicaLayout.forTokenWrite(replicationStrategy,
+
EndpointsForToken.of(tk, pairedEndpoint.get()),
+
pendingReplicas);
wrappers.add(wrapViewBatchResponseHandler(mutation,
consistencyLevel,
consistencyLevel,
-
EndpointsForToken.of(tk, pairedEndpoint.get()),
-
pendingReplicas,
+ liveAndDown,
baseComplete,
WriteType.BATCH,
cleanup,
@@ -1269,11 +1274,10 @@ public class StorageProxy implements StorageProxyMBean
{
String keyspaceName = mutation.getKeyspaceName();
Keyspace keyspace = Keyspace.open(keyspaceName);
- AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-
Token tk = mutation.key().getToken();
ReplicaPlan.ForTokenWrite replicaPlan =
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
AbstractWriteResponseHandler<IMutation> responseHandler =
rs.getWriteResponseHandler(replicaPlan, callback, writeType,
queryStartNanoTime);
performer.apply(mutation, replicaPlan, responseHandler,
localDataCenter);
@@ -1289,12 +1293,12 @@ public class StorageProxy implements StorageProxyMBean
long
queryStartNanoTime)
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
- AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
Token tk = mutation.key().getToken();
ReplicaPlan.ForTokenWrite replicaPlan =
ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+ AbstractReplicationStrategy rs = replicaPlan.replicationStrategy();
AbstractWriteResponseHandler<IMutation> writeHandler =
rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
- BatchlogResponseHandler<IMutation> batchHandler = new
BatchlogResponseHandler<>(writeHandler,
batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
+ BatchlogResponseHandler<IMutation> batchHandler = new
BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(rs),
cleanup, queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1305,24 +1309,20 @@ public class StorageProxy implements StorageProxyMBean
private static WriteResponseHandlerWrapper
wrapViewBatchResponseHandler(Mutation mutation,
ConsistencyLevel consistencyLevel,
ConsistencyLevel batchConsistencyLevel,
-
EndpointsForToken naturalEndpoints,
-
EndpointsForToken pendingEndpoints,
+
ReplicaLayout.ForTokenWrite liveAndDown,
AtomicLong baseComplete,
WriteType writeType,
BatchlogResponseHandler.BatchlogCleanup cleanup,
long queryStartNanoTime)
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
- AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-
- ReplicaLayout.ForTokenWrite liveAndDown =
ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
ReplicaPlan.ForTokenWrite replicaPlan =
ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown,
ReplicaPlans.writeAll);
-
- AbstractWriteResponseHandler<IMutation> writeHandler =
rs.getWriteResponseHandler(replicaPlan, () -> {
+ AbstractReplicationStrategy replicationStrategy =
replicaPlan.replicationStrategy();
+ AbstractWriteResponseHandler<IMutation> writeHandler =
replicationStrategy.getWriteResponseHandler(replicaPlan, () -> {
long delay = Math.max(0, System.currentTimeMillis() -
baseComplete.get());
viewWriteMetrics.viewWriteLatency.update(delay, MILLISECONDS);
}, writeType, queryStartNanoTime);
- BatchlogResponseHandler<IMutation> batchHandler = new
ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace),
cleanup, queryStartNanoTime);
+ BatchlogResponseHandler<IMutation> batchHandler = new
ViewWriteMetricsWrapped(writeHandler,
batchConsistencyLevel.blockFor(replicationStrategy), cleanup,
queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1617,14 +1617,15 @@ public class StorageProxy implements StorageProxyMBean
{
Keyspace keyspace = Keyspace.open(keyspaceName);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- EndpointsForToken replicas =
keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
+ AbstractReplicationStrategy replicationStrategy =
keyspace.getReplicationStrategy();
+ EndpointsForToken replicas =
replicationStrategy.getNaturalReplicasForToken(key);
// CASSANDRA-13043: filter out those endpoints not accepting clients
yet, maybe because still bootstrapping
replicas = replicas.filter(replica ->
StorageService.instance.isRpcReady(replica.endpoint()));
// TODO have a way to compute the consistency level
if (replicas.isEmpty())
- throw UnavailableException.create(cl, cl.blockFor(keyspace), 0);
+ throw UnavailableException.create(cl,
cl.blockFor(replicationStrategy), 0);
List<Replica> localReplicas = new ArrayList<>(replicas.size());
@@ -1636,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
{
// If the consistency required is local then we should not involve
other DCs
if (cl.isDatacenterLocal())
- throw UnavailableException.create(cl, cl.blockFor(keyspace),
0);
+ throw UnavailableException.create(cl,
cl.blockFor(replicationStrategy), 0);
// No endpoint in local DC, pick the closest endpoint according to
the snitch
replicas =
snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
@@ -1739,6 +1740,8 @@ public class StorageProxy implements StorageProxyMBean
SinglePartitionReadCommand command = group.queries.get(0);
TableMetadata metadata = command.metadata();
DecoratedKey key = command.partitionKey();
+ // calculate the blockFor before repair any paxos round to avoid RS
being altered in between.
+ int blockForRead =
consistencyLevel.blockFor(Keyspace.open(metadata.keyspace).getReplicationStrategy());
PartitionIterator result = null;
try
@@ -1771,7 +1774,7 @@ public class StorageProxy implements StorageProxyMBean
}
catch (WriteTimeoutException e)
{
- throw new ReadTimeoutException(consistencyLevel, 0,
consistencyLevel.blockFor(Keyspace.open(metadata.keyspace)), false);
+ throw new ReadTimeoutException(consistencyLevel, 0,
blockForRead, false);
}
catch (WriteFailureException e)
{
diff --git
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index e17fe09..889fa79 100644
---
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -537,14 +537,14 @@ public class ReplicaFilteringProtection<E extends
Endpoints<E>>
}
catch (ReadTimeoutException e)
{
- int blockFor = consistency.blockFor(keyspace);
+ int blockFor =
consistency.blockFor(replicaPlan.replicationStrategy());
throw new ReadTimeoutException(consistency, blockFor - 1,
blockFor, true);
}
catch (UnavailableException e)
{
- int blockFor = consistency.blockFor(keyspace);
+ int blockFor =
consistency.blockFor(replicaPlan.replicationStrategy());
throw UnavailableException.create(consistency, blockFor,
blockFor - 1);
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 1b08877..ca47612 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -155,7 +155,7 @@ public abstract class AbstractReadRepair<E extends
Endpoints<E>, P extends Repli
ConsistencyLevel consistency = replicaPlan().consistencyLevel();
ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ?
ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
return consistency != ConsistencyLevel.EACH_QUORUM
- && consistency.satisfies(speculativeCL, cfs.keyspace)
+ && consistency.satisfies(speculativeCL,
replicaPlan.get().replicationStrategy())
&& cfs.sampleReadLatencyNanos <=
command.getTimeout(NANOSECONDS);
}
diff --git
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
index 68d1b4c..7a4882b 100644
---
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
+++
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepairs.java
@@ -85,7 +85,7 @@ public class BlockingReadRepairs
if (!suppressException)
{
- int blockFor = consistency.blockFor(keyspace);
+ int blockFor =
consistency.blockFor(keyspace.getReplicationStrategy());
Tracing.trace("Timed out while read-repairing after
receiving all {} data and digest responses", blockFor);
throw new ReadTimeoutException(consistency, blockFor - 1,
blockFor, true);
}
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 7eebef7..7855150 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -77,7 +77,7 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1",
KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
new StringToken("CA"),
new StringToken("BB"));
@@ -110,7 +110,7 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1",
KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
new StringToken("CA"),
new StringToken("BB"));
@@ -142,7 +142,7 @@ public class ViewUtilsTest
KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1",
KeyspaceParams.create(false, replicationMap));
Schema.instance.load(meta);
- Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint("Keyspace1",
+ Optional<Replica> naturalEndpoint =
ViewUtils.getViewNaturalEndpoint(Keyspace.open("Keyspace1").getReplicationStrategy(),
new StringToken("AB"),
new StringToken("BB"));
diff --git
a/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java
b/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java
new file mode 100644
index 0000000..d5f62d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/AssureSufficientLiveNodesTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.NeverSpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * The test cases check that no false unavailable exception is thrown due to
+ * the concurrent modification to the ReplicationStrategy, e.g. alter keyspace.
+ *
+ * See https://issues.apache.org/jira/browse/CASSANDRA-16545 for details.
+ */
+@RunWith(BMUnitRunner.class)
+@BMRule(name = "FailureDecector sees all nodes as live", // applies to all
test cases in the class
+ targetClass = "FailureDetector",
+ targetMethod = "isAlive",
+ action = "return true;")
+public class AssureSufficientLiveNodesTest
+{
+ private static final AtomicInteger testIdGen = new AtomicInteger(0);
+ private static final Supplier<String> keyspaceNameGen = () -> "race_" +
testIdGen.getAndIncrement();
+ private static final String DC1 = "datacenter1";
+ private static final String DC2 = "datacenter2";
+ private static final String DC3 = "datacenter3";
+ private static final int RACE_TEST_LOOPS = 100;
+ private static final Token tk = new Murmur3Partitioner.LongToken(0);
+
+ @BeforeClass
+ public static void setUpClass() throws Throwable
+ {
+ SchemaLoader.loadSchema();
+ // Register peers with expected DC for NetworkTopologyStrategy.
+ TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+ metadata.clearUnsafe();
+
+ DatabaseDescriptor.setEndpointSnitch(new
AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ byte[] address = endpoint.addressBytes;
+ return "rake" + address[1];
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ byte[] address = endpoint.addressBytes;
+ return "datacenter" + address[1];
+ }
+ });
+
+ List<InetAddressAndPort> instances = ImmutableList.of(
+ // datacenter 1
+ InetAddressAndPort.getByName("127.1.0.255"),
InetAddressAndPort.getByName("127.1.0.254"),
InetAddressAndPort.getByName("127.1.0.253"),
+ // datacenter 2
+ InetAddressAndPort.getByName("127.2.0.255"),
InetAddressAndPort.getByName("127.2.0.254"),
InetAddressAndPort.getByName("127.2.0.253"),
+ // datacenter 3
+ InetAddressAndPort.getByName("127.3.0.255"),
InetAddressAndPort.getByName("127.3.0.254"),
InetAddressAndPort.getByName("127.3.0.253"));
+
+ for (int i = 0; i < instances.size(); i++)
+ {
+ InetAddressAndPort ip = instances.get(i);
+ metadata.updateHostId(UUID.randomUUID(), ip);
+ metadata.updateNormalToken(new Murmur3Partitioner.LongToken(i),
ip);
+ }
+ }
+
+ @Test
+ public void insufficientLiveNodesTest()
+ {
+ final KeyspaceParams largeRF = KeyspaceParams.nts("datacenter1", 6);
+ // Not a race in fact. It is just testing the Unavailable can be
correctly thrown.
+ assertThatThrownBy(() ->
+ raceOfReplicationStrategyTest(largeRF, largeRF, 1,
+ keyspace ->
ReplicaPlans.forWrite(keyspace, QUORUM, tk, ReplicaPlans.writeNormal))
+ ).as("Unavailable should be thrown given 3 live nodes is less than a
quorum of 6")
+ .isInstanceOf(UnavailableException.class)
+ .hasMessageContaining("Cannot achieve consistency level QUORUM");
+ }
+
+ @Test
+ public void addDatacenterShouldNotCausesUnavailableWithEachQuorumTest()
throws Throwable
+ {
+ // write
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 3),
+ // alter to
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // test
+ keyspace -> ReplicaPlans.forWrite(keyspace, EACH_QUORUM, tk,
ReplicaPlans.writeNormal)
+ );
+ // read
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 3),
+ // alter to
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // test
+ keyspace -> ReplicaPlans.forRead(keyspace, tk, EACH_QUORUM,
NeverSpeculativeRetryPolicy.INSTANCE)
+ );
+ }
+
+
+ @Test
+ public void addDatacenterShouldNotCausesUnavailableWithQuorumTest() throws
Throwable
+ {
+ // write
+ raceOfReplicationStrategyTest(
+ // init. The # of live endpoints is 3.
+ KeyspaceParams.nts(DC1, 3),
+ // alter to. (3 + 3) / 2 + 1 > 3
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // test
+ keyspace -> ReplicaPlans.forWrite(keyspace, QUORUM, tk,
ReplicaPlans.writeNormal)
+ );
+ raceOfReplicationStrategyTest(
+ // init. The # of live endpoints is 3 = 2 + 1
+ KeyspaceParams.nts(DC1, 2, DC2, 1),
+ // alter to. (3 + 3) / 2 + 1 > 3
+ KeyspaceParams.nts(DC1, 2, DC2, 1, DC3, 3),
+ // test
+ keyspace -> ReplicaPlans.forWrite(keyspace, QUORUM, tk,
ReplicaPlans.writeNormal)
+ );
+
+ // read
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 3),
+ // alter to
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // test
+ keyspace -> ReplicaPlans.forRead(keyspace, tk, QUORUM,
NeverSpeculativeRetryPolicy.INSTANCE)
+ );
+ raceOfReplicationStrategyTest(
+ // init. The # of live endpoints is 3 = 2 + 1
+ KeyspaceParams.nts(DC1, 2, DC2, 1),
+ // alter to. (3 + 3) / 2 + 1 > 3
+ KeyspaceParams.nts(DC1, 2, DC2, 1, DC3, 3),
+ // test
+ keyspace -> ReplicaPlans.forRead(keyspace, tk, QUORUM,
NeverSpeculativeRetryPolicy.INSTANCE)
+ );
+ }
+
+ @Test
+ public void raceOnRemoveDatacenterNotCausesUnavailable() throws Throwable
+ {
+ // write
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // alter to
+ KeyspaceParams.nts(DC1, 3),
+ // test
+ keyspace -> ReplicaPlans.forWrite(keyspace, EACH_QUORUM, tk,
ReplicaPlans.writeNormal)
+ );
+
+ // read
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 3, DC2, 3),
+ // alter to
+ KeyspaceParams.nts(DC1, 3),
+ // test
+ keyspace -> ReplicaPlans.forRead(keyspace, tk, EACH_QUORUM,
NeverSpeculativeRetryPolicy.INSTANCE)
+ );
+ }
+
+ @Test
+ public void increaseReplicationFactorShouldNotCausesUnavailableTest()
throws Throwable
+ {
+ // write
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 1),
+ // alter to
+ KeyspaceParams.nts(DC1, 3),
+ // test
+ keyspace -> ReplicaPlans.forWrite(keyspace, LOCAL_QUORUM, tk,
ReplicaPlans.writeNormal)
+ );
+
+ // read
+ raceOfReplicationStrategyTest(
+ // init
+ KeyspaceParams.nts(DC1, 1),
+ // alter to
+ KeyspaceParams.nts(DC1, 3),
+ // test
+ keyspace -> ReplicaPlans.forRead(keyspace, tk, LOCAL_QUORUM,
NeverSpeculativeRetryPolicy.INSTANCE)
+ );
+ }
+
+ /**
+ * A test runner that runs the `test` while changing the
ReplicationStrategy of the raced keyspace.
+ * It loops at most for RACE_TEST_LOOPS time if unable to produce the race
or any exception.
+ */
+ private static void raceOfReplicationStrategyTest(KeyspaceParams init,
+ KeyspaceParams alterTo,
+ int loopCount,
+ Consumer<Keyspace> test)
throws Throwable
+ {
+ String keyspaceName = keyspaceNameGen.get();
+ KeyspaceMetadata initKsMeta = KeyspaceMetadata.create(keyspaceName,
init, Tables.of(SchemaLoader.standardCFMD("Foo", "Bar").build()));
+ KeyspaceMetadata alterToKsMeta = initKsMeta.withSwapped(alterTo);
+ MigrationManager.announceNewKeyspace(initKsMeta, true);
+ Keyspace racedKs = Keyspace.open(keyspaceName);
+ ExecutorService es = Executors.newFixedThreadPool(2);
+ try (AutoCloseable ignore = () -> {
+ es.shutdown();
+ es.awaitTermination(1, TimeUnit.MINUTES);
+ })
+ {
+ for (int i = 0; i < loopCount; i++)
+ {
+ // reset the keyspace
+ racedKs.setMetadata(initKsMeta);
+ CountDownLatch trigger = new CountDownLatch(1);
+ // starts 2 runnables that could race
+ Future<?> f1 = es.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(trigger);
+ // Update replication strategy
+ racedKs.setMetadata(alterToKsMeta);
+ });
+ Future<?> f2 = es.submit(() -> {
+ Uninterruptibles.awaitUninterruptibly(trigger);
+ test.accept(racedKs);
+ });
+ trigger.countDown();
+ FBUtilities.waitOnFutures(Arrays.asList(f1, f2));
+ }
+ }
+ catch (RuntimeException rte)
+ {
+ // extract out the root cause wrapped by `waitOnFutures` and
`future.get()`, and rethrow
+ if (rte.getCause() != null
+ && rte.getCause() instanceof ExecutionException
+ && rte.getCause().getCause() != null)
+ throw rte.getCause().getCause();
+ else
+ throw rte;
+ }
+ }
+
+ private static void raceOfReplicationStrategyTest(KeyspaceParams init,
+ KeyspaceParams alterTo,
+ Consumer<Keyspace> test)
throws Throwable
+ {
+ raceOfReplicationStrategyTest(init, alterTo, RACE_TEST_LOOPS, test);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index 15fbd27..19ed66d 100644
---
a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -151,7 +151,7 @@ public class WriteResponseHandlerTransientTest
{
EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(),
full(EP1), full(EP2), trans(EP3), full(EP5));
EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(),
full(EP4), trans(EP6));
- ReplicaLayout.ForTokenWrite layout = new
ReplicaLayout.ForTokenWrite(natural, pending);
+ ReplicaLayout.ForTokenWrite layout = new
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural, pending);
ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks,
ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll);
Assert.assertTrue(Iterables.elementsEqual(EndpointsForRange.of(full(EP4),
trans(EP6)),
@@ -160,13 +160,13 @@ public class WriteResponseHandlerTransientTest
private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken
natural, EndpointsForToken selected)
{
- return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM,
EndpointsForToken.empty(dummy.getToken()), natural, natural, selected);
+ return new ReplicaPlan.ForTokenWrite(ks, ks.getReplicationStrategy(),
ConsistencyLevel.QUORUM, EndpointsForToken.empty(dummy.getToken()), natural,
natural, selected);
}
private static ReplicaPlan.ForTokenWrite
getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort>
livePredicate)
{
- ReplicaLayout.ForTokenWrite liveAndDown = new
ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken()));
- ReplicaLayout.ForTokenWrite live = new
ReplicaLayout.ForTokenWrite(natural.filter(r ->
livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
+ ReplicaLayout.ForTokenWrite liveAndDown = new
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural,
EndpointsForToken.empty(dummy.getToken()));
+ ReplicaLayout.ForTokenWrite live = new
ReplicaLayout.ForTokenWrite(ks.getReplicationStrategy(), natural.filter(r ->
livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown,
live, ReplicaPlans.writeNormal);
}
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 36d792e..faae913 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -1321,7 +1321,7 @@ public class DataResolverTest extends
AbstractReadResponseTest
private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas,
ConsistencyLevel consistencyLevel)
{
- return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks,
consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas, 1));
+ return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks,
ks.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS,
replicas, replicas, 1));
}
private static void resolveAndConsume(DataResolver resolver)
@@ -1338,4 +1338,4 @@ public class DataResolverTest extends
AbstractReadResponseTest
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 99101f1..4dee52a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -161,6 +161,6 @@ public class DigestResolverTest extends
AbstractReadResponseTest
private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel
consistencyLevel, EndpointsForToken replicas)
{
- return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks,
consistencyLevel, replicas, replicas));
+ return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks,
ks.getReplicationStrategy(), consistencyLevel, replicas, replicas));
}
}
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 3eb9b2e..6fc8fbf 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -270,6 +270,6 @@ public class ReadExecutorTest
private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel,
EndpointsForToken natural, EndpointsForToken selected)
{
- return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural,
selected);
+ return new ReplicaPlan.ForTokenRead(ks, ks.getReplicationStrategy(),
consistencyLevel, natural, selected);
}
}
diff --git
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 5315060..badcd35 100644
---
a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++
b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -25,15 +25,12 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.cassandra.dht.ByteOrderedPartitioner;
-import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.EndpointsForToken;
@@ -299,7 +296,12 @@ public abstract class AbstractReadRepairTest
{
Token token = readPlan.range().left.getToken();
EndpointsForToken pending = EndpointsForToken.empty(token);
- return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO,
liveAndDown.forToken(token), pending, Predicates.alwaysTrue(),
ReplicaPlans.writeReadRepair(readPlan));
+ return ReplicaPlans.forWrite(readPlan.keyspace(),
+ ConsistencyLevel.TWO,
+ liveAndDown.forToken(token),
+ pending,
+ replica -> true,
+ ReplicaPlans.writeReadRepair(readPlan));
}
static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas,
EndpointsForRange targets)
{
@@ -311,7 +313,7 @@ public abstract class AbstractReadRepairTest
}
static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace,
ConsistencyLevel consistencyLevel, EndpointsForRange replicas,
EndpointsForRange targets)
{
- return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
ReplicaUtils.FULL_BOUNDS, replicas, targets, 1);
+ return new ReplicaPlan.ForRangeRead(keyspace,
keyspace.getReplicationStrategy(), consistencyLevel, ReplicaUtils.FULL_BOUNDS,
replicas, targets, 1);
}
public abstract InstrumentedReadRepair
createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?>
replicaPlan, long queryStartNanoTime);
diff --git
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index e4b3a71..8562db7 100644
---
a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++
b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.Util;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -118,11 +119,12 @@ public class BlockingReadRepairTest extends
AbstractReadRepairTest
@Test
public void consistencyLevelTest() throws Exception
{
-
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM,
ks));
-
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM,
ks));
-
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
-
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
-
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+ AbstractReplicationStrategy rs = ks.getReplicationStrategy();
+
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM,
rs));
+
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM,
rs));
+
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
+
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
+
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
}
diff --git
a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 11b057f..dad9aa4 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Iterables;
import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.locator.ReplicaPlan;
@@ -168,11 +169,12 @@ public class ReadRepairTest
@Test
public void consistencyLevelTest() throws Exception
{
-
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM,
ks));
-
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM,
ks));
-
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks));
-
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks));
-
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks));
+ AbstractReplicationStrategy rs = ks.getReplicationStrategy();
+
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM,
rs));
+
Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM,
rs));
+
Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
+
Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
+
Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
}
private static void assertMutationEqual(Mutation expected, Mutation actual)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]