This is an automated email from the ASF dual-hosted git repository.
slebresne pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 2d0b168 Fix serial read/non-applying CAS linearizability
2d0b168 is described below
commit 2d0b16804785660e8515aca9944784fb3733c619
Author: Sylvain Lebresne <[email protected]>
AuthorDate: Wed May 20 14:56:02 2020 +0200
Fix serial read/non-applying CAS linearizability
Before this patch, a SERIAL read or a non-applying CAS replay any
in-progress commit by calling `beginAndRepairPaxos`, but only a quorum
of nodes is contacted, so a minority of nodes could have an unfinished
in-progress proposal in their Paxos state. If such in-progress proposal
is not replayed by a SERIAL read/non-applying CAS, it should never be
replayed by any following operation as that would break serializability,
but nothing was done to avoid this.
This patch ensures that both a SERIAL read or a non-applying CAS commit
an empty update before succeeding. This ensures that no prior incomplete
in-progress proposal can be replayed (such proposal will be discarded as
older than the last committed ballot).
As this fix has a performance impact on SERIAL reads, a flag is provided
to disable the new code (even if this is discouraged by a warning).
Patch by Sylvain Lebresne, reviewed by Benjamin Lerer for CASSANDRA-12126
---
CHANGES.txt | 1 +
NEWS.txt | 8 +
.../org/apache/cassandra/service/StorageProxy.java | 231 +++++--
.../org/apache/cassandra/service/paxos/Commit.java | 6 +
.../apache/cassandra/service/paxos/PaxosState.java | 3 -
.../cassandra/service/paxos/PrepareCallback.java | 12 +-
.../cassandra/distributed/impl/Instance.java | 112 ++--
.../apache/cassandra/distributed/test/CASTest.java | 679 +++++++++++++++++++++
8 files changed, 939 insertions(+), 113 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 546fd98..d6f406d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.24:
+ * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
* Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
* Improved check of num_tokens against the length of initial_token
(CASSANDRA-14477)
* Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228)
diff --git a/NEWS.txt b/NEWS.txt
index 7034c2c..6cc5e84 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -47,6 +47,14 @@ using the provided 'sstableupgrade' tool.
Upgrading
---------
+ - This release fix a correctness issue with SERIAL reads, and LWT writes
that do not apply.
+ Unfortunately, this fix has a performance impact on read performance at
the SERIAL or
+ LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads,
the performance
+ impact may be noticeable and may also result in an increased of
timeouts. For that
+ reason, a opt-in system property has been added to disable the fix:
+ -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+ Use this flag at your own risk as it revert SERIAL reads to the
incorrect behavior of
+ previous versions. See CASSANDRA-12126 for details.
- In cassandra.yaml, when using vnodes num_tokens must be defined if
initial_token is defined.
If it is not defined, or not equal to the numbers of tokens defined in
initial_tokens,
the node will not start. See CASSANDRA-14477 for details.
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c7888c4..91dd991 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,6 +24,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import com.google.common.base.Predicate;
import com.google.common.annotations.VisibleForTesting;
@@ -109,6 +110,10 @@ public class StorageProxy implements StorageProxyMBean
*/
private static final int MAX_CONCURRENT_RANGE_REQUESTS = Math.max(1,
Integer.getInteger("cassandra.max_concurrent_range_requests",
FBUtilities.getAvailableProcessors() * 10));
+ private static final String DISABLE_SERIAL_READ_LINEARIZABILITY_KEY =
"cassandra.unsafe.disable-serial-reads-linearizability";
+ private static final boolean disableSerialReadLinearizability =
+
Boolean.parseBoolean(System.getProperty(DISABLE_SERIAL_READ_LINEARIZABILITY_KEY,
"false"));
+
private StorageProxy()
{
}
@@ -163,6 +168,16 @@ public class StorageProxy implements StorageProxyMBean
.execute(counterWriteTask(mutation, targets,
responseHandler, localDataCenter));
}
};
+
+ if (disableSerialReadLinearizability)
+ {
+ logger.warn("This node was started with -D{}. SERIAL (and
LOCAL_SERIAL) reads coordinated by this node " +
+ "will not offer linearizability (see CASSANDRA-12126
for details on what this mean) with " +
+ "respect to other SERIAL operations. Please note that,
with this flag, SERIAL reads will be " +
+ "slower than QUORUM reads, yet offer no more
guarantee. This flag should only be used in " +
+ "the restricted case of upgrading from a
pre-CASSANDRA-12126 version, and only if you " +
+ "understand the tradeoff.",
DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+ }
}
/**
@@ -216,26 +231,12 @@ public class StorageProxy implements StorageProxyMBean
throws UnavailableException, IsBootstrappingException,
RequestFailureException, RequestTimeoutException, InvalidRequestException
{
final long start = System.nanoTime();
- int contentions = 0;
try
{
- consistencyForPaxos.validateForCas();
- consistencyForCommit.validateForCasCommit(keyspaceName);
-
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName,
cfName);
- long timeout =
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
- while (System.nanoTime() - start < timeout)
+ Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
{
- // for simplicity, we'll do a single liveness check at the
start of each attempt
- Pair<List<InetAddress>, Integer> p =
getPaxosParticipants(metadata, key, consistencyForPaxos);
- List<InetAddress> liveEndpoints = p.left;
- int requiredParticipants = p.right;
-
- final Pair<UUID, Integer> pair = beginAndRepairPaxos(start,
key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos,
consistencyForCommit, true, state);
- final UUID ballot = pair.left;
- contentions += pair.right;
-
// read the current values and check they validate the
conditions
Tracing.trace("Reading existing values for CAS precondition");
SinglePartitionReadCommand readCommand =
request.readCommand(FBUtilities.nowInSeconds());
@@ -251,11 +252,10 @@ public class StorageProxy implements StorageProxyMBean
{
Tracing.trace("CAS precondition does not match current
values {}", current);
casWriteMetrics.conditionNotMet.inc();
- return current.rowIterator();
+ return Pair.create(PartitionUpdate.emptyUpdate(metadata,
key), current.rowIterator());
}
- // finish the paxos round w/ the desired updates
- // TODO turn null updates into delete?
+ // Create the desired updates
PartitionUpdate updates = request.makeUpdates(current);
// Apply triggers to cas updates. A consideration here is that
@@ -267,47 +267,141 @@ public class StorageProxy implements StorageProxyMBean
// InvalidRequestException) any which aren't.
updates = TriggerExecutor.instance.execute(updates);
+ return Pair.create(updates, null);
+ };
- Commit proposal = Commit.newProposal(ballot, updates);
- Tracing.trace("CAS precondition is met; proposing
client-requested updates for {}", ballot);
- if (proposePaxos(proposal, liveEndpoints,
requiredParticipants, true, consistencyForPaxos))
- {
- commitPaxos(proposal, consistencyForCommit, true);
- Tracing.trace("CAS successful");
- return null;
- }
+ return doPaxos(metadata,
+ key,
+ consistencyForPaxos,
+ consistencyForCommit,
+ consistencyForCommit,
+ state,
+ start,
+ casWriteMetrics,
+ updateProposer);
- Tracing.trace("Paxos proposal not accepted (pre-empted by a
higher ballot)");
- contentions++;
-
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100),
TimeUnit.MILLISECONDS);
- // continue to retry
- }
-
- throw new WriteTimeoutException(WriteType.CAS,
consistencyForPaxos, 0,
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
}
- catch (WriteTimeoutException|ReadTimeoutException e)
+ catch (WriteTimeoutException | ReadTimeoutException e)
{
casWriteMetrics.timeouts.mark();
throw e;
}
- catch (WriteFailureException|ReadFailureException e)
+ catch (WriteFailureException | ReadFailureException e)
{
casWriteMetrics.failures.mark();
throw e;
}
- catch(UnavailableException e)
+ catch (UnavailableException e)
{
casWriteMetrics.unavailables.mark();
throw e;
}
finally
{
- if(contentions > 0)
- casWriteMetrics.contention.update(contentions);
casWriteMetrics.addNano(System.nanoTime() - start);
}
}
+ /**
+ * Performs the Paxos rounds for a given proposal, retrying when preempted
until the timeout.
+ *
+ * <p>The main 'configurable' of this method is the {@code
createUpdateProposal} method: it is called by the method
+ * once a ballot has been successfully 'prepared' to generate the update
to 'propose' (and commit if the proposal is
+ * successful). That method also generates the result that the whole
method will return. Note that due to retrying,
+ * this method may be called multiple times and does not have to return
the same results.
+ *
+ * @param metadata the table to update with Paxos.
+ * @param key the partition updated.
+ * @param consistencyForPaxos the serial consistency of the operation
(either {@link ConsistencyLevel#SERIAL} or
+ * {@link ConsistencyLevel#LOCAL_SERIAL}).
+ * @param consistencyForReplayCommits the consistency for the commit phase
of "replayed" in-progress operations.
+ * @param consistencyForCommit the consistency for the commit phase of
_this_ operation update.
+ * @param state the client state.
+ * @param queryStartNanoTime the nano time for the start of the query this
is part of.
+ * @param casMetrics the metrics to update for this operation.
+ * @param createUpdateProposal method called after a successful 'prepare'
phase to obtain 1) the actual update of
+ * this operation and 2) the result that the whole method should
return. This can return {@code null} in the
+ * special where, after having "prepared" (and thus potentially
replayed in-progress upgdates), we don't want
+ * to propose anything (the whole method then return {@code null}).
+ * @return the second element of the pair returned by {@code
createUpdateProposal} (for the last call of that method
+ * if that method is called multiple times due to retries).
+ */
+ private static RowIterator doPaxos(CFMetaData metadata,
+ DecoratedKey key,
+ ConsistencyLevel consistencyForPaxos,
+ ConsistencyLevel
consistencyForReplayCommits,
+ ConsistencyLevel consistencyForCommit,
+ ClientState state,
+ long queryStartNanoTime,
+ CASClientRequestMetrics casMetrics,
+ Supplier<Pair<PartitionUpdate,
RowIterator>> createUpdateProposal)
+ throws UnavailableException, IsBootstrappingException,
RequestFailureException, RequestTimeoutException, InvalidRequestException
+ {
+ int contentions = 0;
+ try
+ {
+ consistencyForPaxos.validateForCas();
+ consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
+ consistencyForCommit.validateForCasCommit(metadata.ksName);
+
+ long timeout =
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
+ while (System.nanoTime() - queryStartNanoTime < timeout)
+ {
+ // for simplicity, we'll do a single liveness check at the
start of each attempt
+ Pair<List<InetAddress>, Integer> p =
getPaxosParticipants(metadata, key, consistencyForPaxos);
+ List<InetAddress> liveEndpoints = p.left;
+ int requiredParticipants = p.right;
+
+ final Pair<UUID, Integer> pair =
beginAndRepairPaxos(queryStartNanoTime,
+ key,
+ metadata,
+
liveEndpoints,
+
requiredParticipants,
+
consistencyForPaxos,
+
consistencyForReplayCommits,
+
casMetrics,
+ state);
+ final UUID ballot = pair.left;
+ contentions += pair.right;
+
+ Pair<PartitionUpdate, RowIterator> proposalPair =
createUpdateProposal.get();
+ // See method javadoc: null here is code for "stop here and
return null".
+ if (proposalPair == null)
+ return null;
+
+ Commit proposal = Commit.newProposal(ballot,
proposalPair.left);
+ Tracing.trace("CAS precondition is met; proposing
client-requested updates for {}", ballot);
+ if (proposePaxos(proposal, liveEndpoints,
requiredParticipants, true, consistencyForPaxos))
+ {
+ // We skip committing accepted updates when they are
empty. This is an optimization which works
+ // because we also skip replaying those same empty update
in beginAndRepairPaxos (see the longer
+ // comment there). As empty update are somewhat common
(serial reads and non-applying CAS propose
+ // them), this is worth bothering.
+ if (!proposal.update.isEmpty())
+ commitPaxos(proposal, consistencyForCommit, true);
+ RowIterator result = proposalPair.right;
+ if (result != null)
+ Tracing.trace("CAS did not apply");
+ else
+ Tracing.trace("CAS applied successfully");
+ return result;
+ }
+
+ Tracing.trace("Paxos proposal not accepted (pre-empted by a
higher ballot)");
+ contentions++;
+
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100),
TimeUnit.MILLISECONDS);
+ // continue to retry
+ }
+
+ throw new WriteTimeoutException(WriteType.CAS,
consistencyForPaxos, 0,
consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+ }
+ finally
+ {
+ if(contentions > 0)
+ casMetrics.contention.update(contentions);
+ }
+ }
+
private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
{
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -364,7 +458,7 @@ public class StorageProxy implements StorageProxyMBean
int
requiredParticipants,
ConsistencyLevel
consistencyForPaxos,
ConsistencyLevel
consistencyForCommit,
- final boolean
isWrite,
+
CASClientRequestMetrics casMetrics,
ClientState state)
throws WriteTimeoutException, WriteFailureException
{
@@ -397,18 +491,31 @@ public class StorageProxy implements StorageProxyMBean
continue;
}
- Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
+ Commit inProgress = summary.mostRecentInProgressCommit;
Commit mostRecent = summary.mostRecentCommit;
// If we have an in-progress ballot greater than the MRC we know,
then it's an in-progress round that
// needs to be completed, so do it.
+ // One special case we make is for update that are empty (which
are proposed by serial reads and
+ // non-applying CAS). While we could handle those as any other
updates, we can optimize this somewhat by
+ // neither committing those empty updates, nor replaying
in-progress ones. The reasoning is this: as the
+ // update is empty, we have nothing to apply to storage in the
commit phase, so the only reason to commit
+ // would be to update the MRC. However, if we skip replaying those
empty updates, then we don't need to
+ // update the MRC for following updates to make progress (that is,
if we didn't had the empty update skip
+ // below _but_ skipped updating the MRC on empty updates, then
we'd be stuck always proposing that same
+ // empty update). And the reason skipping that replay is safe is
that when an operation tries to propose
+ // an empty value, there can be only 2 cases:
+ // 1) the propose succeed, meaning a quorum of nodes accept it,
in which case we are guaranteed no earlier
+ // pending operation can ever be replayed (which is what we
want to guarantee with the empty update).
+ // 2) the propose does not succeed. But then the operation
proposing the empty update will not succeed
+ // either (it will retry or ultimately timeout), and we're
actually ok if earlier pending operation gets
+ // replayed in that case.
+ // Tl;dr, it is safe to skip committing empty updates _as long as_
we also skip replying them below. And
+ // doing is more efficient, so we do so.
if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent))
{
Tracing.trace("Finishing incomplete paxos round {}",
inProgress);
- if(isWrite)
- casWriteMetrics.unfinishedCommit.inc();
- else
- casReadMetrics.unfinishedCommit.inc();
+ casMetrics.unfinishedCommit.inc();
Commit refreshedInProgress = Commit.newProposal(ballot,
inProgress.update);
if (proposePaxos(refreshedInProgress, liveEndpoints,
requiredParticipants, false, consistencyForPaxos))
{
@@ -1535,21 +1642,31 @@ public class StorageProxy implements StorageProxyMBean
PartitionIterator result = null;
try
{
- // make sure any in-progress paxos writes are done (i.e.,
committed to a majority of replicas), before performing a quorum read
- Pair<List<InetAddress>, Integer> p =
getPaxosParticipants(metadata, key, consistencyLevel);
- List<InetAddress> liveEndpoints = p.left;
- int requiredParticipants = p.right;
-
- // does the work of applying in-progress writes; throws UAE or
timeout if it can't
- final ConsistencyLevel consistencyForCommitOrFetch =
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
-
? ConsistencyLevel.LOCAL_QUORUM
-
: ConsistencyLevel.QUORUM;
+ final ConsistencyLevel consistencyForReplayCommitOrFetch =
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
+ ?
ConsistencyLevel.LOCAL_QUORUM
+ :
ConsistencyLevel.QUORUM;
try
{
- final Pair<UUID, Integer> pair = beginAndRepairPaxos(start,
key, metadata, liveEndpoints, requiredParticipants, consistencyLevel,
consistencyForCommitOrFetch, false, state);
- if (pair.right > 0)
- casReadMetrics.contention.update(pair.right);
+ // Commit an empty update to make sure all in-progress updates
that should be finished first is, _and_
+ // that no other in-progress can get resurrected.
+ Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer =
+ disableSerialReadLinearizability
+ ? () -> null
+ : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata,
key), null);
+ // When replaying, we commit at quorum/local quorum, as we
want to be sure the following read (done at
+ // quorum/local_quorum) sees any replayed updates. Our own
update is however empty, and those don't even
+ // get committed due to an optimiation described in
doPaxos/beingRepairAndPaxos, so the commit
+ // consistency is irrelevant (we use ANY just to emphasis that
we don't wait on our commit).
+ doPaxos(metadata,
+ key,
+ consistencyLevel,
+ consistencyForReplayCommitOrFetch,
+ ConsistencyLevel.ANY,
+ state,
+ start,
+ casReadMetrics,
+ updateProposer);
}
catch (WriteTimeoutException e)
{
@@ -1560,7 +1677,7 @@ public class StorageProxy implements StorageProxyMBean
throw new ReadFailureException(consistencyLevel, e.received,
e.failures, e.blockFor, false);
}
- result = fetchRows(group.commands, consistencyForCommitOrFetch);
+ result = fetchRows(group.commands,
consistencyForReplayCommitOrFetch);
}
catch (UnavailableException e)
{
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 95bd464..a3f491b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -82,6 +82,12 @@ public class Commit
return this.ballot.equals(ballot);
}
+ /** Whether this is an empty commit, that is one with no updates. */
+ public boolean isEmpty()
+ {
+ return update.isEmpty();
+ }
+
public Mutation makeMutation()
{
return new Mutation(update);
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index ee1ba6a..8ab9a98 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -20,12 +20,9 @@
*/
package org.apache.cassandra.service.paxos;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Striped;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index ff81803..26e292e 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -46,7 +46,6 @@ public class PrepareCallback extends
AbstractPaxosCallback<PrepareResponse>
public boolean promised = true;
public Commit mostRecentCommit;
public Commit mostRecentInProgressCommit;
- public Commit mostRecentInProgressCommitWithUpdate;
private final Map<InetAddress, Commit> commitsByReplica = new
ConcurrentHashMap<InetAddress, Commit>();
@@ -56,7 +55,6 @@ public class PrepareCallback extends
AbstractPaxosCallback<PrepareResponse>
// need to inject the right key in the empty commit so comparing with
empty commits in the reply works as expected
mostRecentCommit = Commit.emptyCommit(key, metadata);
mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
- mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key,
metadata);
}
public synchronized void response(MessageIn<PrepareResponse> message)
@@ -64,9 +62,8 @@ public class PrepareCallback extends
AbstractPaxosCallback<PrepareResponse>
PrepareResponse response = message.payload;
logger.trace("Prepare response {} from {}", response, message.from);
- // In case of clock skew, another node could be proposing with ballot
that are quite a bit
- // older than our own. In that case, we record the more recent commit
we've received to make
- // sure we re-prepare on an older ballot.
+ // We set the mostRecentInProgressCommit even if we're not promised
as, in that case, the ballot of that commit
+ // will be used to avoid generating a ballot that has not chance to
win on retry (think clock skew).
if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
mostRecentInProgressCommit = response.inProgressCommit;
@@ -82,11 +79,6 @@ public class PrepareCallback extends
AbstractPaxosCallback<PrepareResponse>
if (response.mostRecentCommit.isAfter(mostRecentCommit))
mostRecentCommit = response.mostRecentCommit;
- // If some response has an update, then we should replay the update
with the highest ballot. So find
- // the the highest commit that actually have an update
- if
(response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) &&
!response.inProgressCommit.update.isEmpty())
- mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
-
latch.countDown();
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ff0095d..0d2eb4b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -647,55 +647,59 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
return YamlConfigurationLoader.fromMap(params, check, Config.class);
}
- private void initializeRing(ICluster cluster)
+ public static void addToRing(boolean bootstrapping, IInstance peer)
{
- // This should be done outside instance in order to avoid serializing
config
- String partitionerName = config.getString("partitioner");
- List<String> initialTokens = new ArrayList<>();
- List<InetSocketAddress> hosts = new ArrayList<>();
- List<UUID> hostIds = new ArrayList<>();
- for (int i = 1 ; i <= cluster.size() ; ++i)
+ try
{
- IInstanceConfig config = cluster.get(i).config();
- initialTokens.add(config.getString("initial_token"));
- hosts.add(config.broadcastAddress());
- hostIds.add(config.hostId());
+ IInstanceConfig config = peer.config();
+ IPartitioner partitioner =
FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token =
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ InetAddress address = config.broadcastAddress().getAddress();
+
+ UUID hostId = config.hostId();
+ Gossiper.runInGossipStageBlocking(() -> {
+ Gossiper.instance.initializeNodeUnsafe(address, hostId, 1);
+ Gossiper.instance.injectApplicationState(address,
+ ApplicationState.TOKENS,
+ new
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
+ StorageService.instance.onChange(address,
+ ApplicationState.STATUS,
+ bootstrapping
+ ? new
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(Collections.singleton(token))
+ : new
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
+ Gossiper.instance.realMarkAlive(address,
Gossiper.instance.getEndpointStateForEndpoint(address));
+ });
+ int messagingVersion = peer.isShutdown()
+ ? MessagingService.current_version
+ : Math.min(MessagingService.current_version,
peer.getMessagingVersion());
+ MessagingService.instance().setVersion(address, messagingVersion);
+
+ if (!bootstrapping)
+ assert
StorageService.instance.getTokenMetadata().isMember(address);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
}
+ catch (Throwable e) // UnknownHostException
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ public static void removeFromRing(IInstance peer)
+ {
try
{
- IPartitioner partitioner =
FBUtilities.newPartitioner(partitionerName);
- StorageService storageService = StorageService.instance;
- List<Token> tokens = new ArrayList<>();
- for (String token : initialTokens)
- tokens.add(partitioner.getTokenFactory().fromString(token));
-
- for (int i = 0; i < tokens.size(); i++)
- {
- InetSocketAddress ep = hosts.get(i);
- UUID hostId = hostIds.get(i);
- Token token = tokens.get(i);
- Gossiper.runInGossipStageBlocking(() -> {
- Gossiper.instance.initializeNodeUnsafe(ep.getAddress(),
hostId, 1);
- Gossiper.instance.injectApplicationState(ep.getAddress(),
-
ApplicationState.TOKENS,
- new
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
- storageService.onChange(ep.getAddress(),
- ApplicationState.STATUS,
- new
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
- Gossiper.instance.realMarkAlive(ep.getAddress(),
Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress()));
- });
- int messagingVersion = cluster.get(ep).isShutdown()
- ? MessagingService.current_version
- :
Math.min(MessagingService.current_version,
cluster.get(ep).getMessagingVersion());
- MessagingService.instance().setVersion(ep.getAddress(),
messagingVersion);
- }
-
- // check that all nodes are in token metadata
- for (int i = 0; i < tokens.size(); ++i)
- assert
storageService.getTokenMetadata().isMember(hosts.get(i).getAddress());
-
- storageService.setNormalModeUnsafe();
+ IInstanceConfig config = peer.config();
+ IPartitioner partitioner =
FBUtilities.newPartitioner(config.getString("partitioner"));
+ Token token =
partitioner.getTokenFactory().fromString(config.getString("initial_token"));
+ InetAddress address = config.broadcastAddress().getAddress();
+
+ Gossiper.runInGossipStageBlocking(() -> {
+ StorageService.instance.onChange(address,
+ ApplicationState.STATUS,
+ new
VersionedValue.VersionedValueFactory(partitioner).left(Collections.singleton(token),
0L));
+ Gossiper.instance.removeEndpoint(address);
+ });
+ PendingRangeCalculatorService.instance.blockUntilFinished();
}
catch (Throwable e) // UnknownHostException
{
@@ -703,6 +707,28 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}
}
+ public static void addToRingNormal(IInstance peer)
+ {
+ addToRing(false, peer);
+ assert
StorageService.instance.getTokenMetadata().isMember(peer.broadcastAddress().getAddress());
+ }
+
+ public static void addToRingBootstrapping(IInstance peer)
+ {
+ addToRing(true, peer);
+ }
+
+ private static void initializeRing(ICluster cluster)
+ {
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ addToRing(false, cluster.get(i));
+
+ for (int i = 1; i <= cluster.size(); ++i)
+ assert
StorageService.instance.getTokenMetadata().isMember(cluster.get(i).broadcastAddress().getAddress());
+
+ StorageService.instance.setNormalModeUnsafe();
+ }
+
public Future<Void> shutdown()
{
return shutdown(true);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
new file mode 100644
index 0000000..0b1dce6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
+import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
+import static org.apache.cassandra.net.MessagingService.Verb.READ;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CASTest extends TestBaseImpl
+{
+ @Test
+ public void simpleUpdate() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 1));
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v
= 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 1));
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ @Test
+ public void incompletePrepare() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config ->
config.set("write_request_timeout_in_ms",
200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop =
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1
responses.", wrapped.getCause().getMessage());
+ }
+ drop.off();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+ }
+ }
+
+ @Test
+ public void incompletePropose() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config ->
config.set("write_request_timeout_in_ms",
200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop1 =
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1
responses.", wrapped.getCause().getMessage());
+ }
+ drop1.off();
+ // make sure we encounter one of the in-progress proposals so we
complete it
+
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ @Test
+ public void incompleteCommit() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, config ->
config.set("write_request_timeout_in_ms",
200L).set("cas_contention_timeout_in_ms", 200L))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+
+ IMessageFilters.Filter drop1 =
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+ try
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+ Assert.fail();
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1
responses.", wrapped.getCause().getMessage());
+ }
+ drop1.off();
+ // make sure we see one of the successful commits
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
+ cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+ row(1, 1, 2));
+ }
+ }
+
+ private int[] paxosAndReadVerbs() {
+ return new int[] {
+ MessagingService.Verb.PAXOS_PREPARE.ordinal(),
+ MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
+ MessagingService.Verb.PAXOS_COMMIT.ordinal(),
+ MessagingService.Verb.READ.ordinal()
+ };
+ }
+
+ /**
+ * Base test to ensure that if a write times out but with a proposal
accepted by some nodes (less then quorum), and
+ * a following SERIAL operation does not observe that write (the node
having accepted it do not participate in that
+ * following operation), then that write is never applied, even when the
nodes having accepted the original proposal
+ * participate.
+ *
+ * <p>In other words, if an operation timeout, it may or may not be
applied, but that "fate" is persistently decided
+ * by the very SERIAL operation that "succeed" (in the sense of 'not
timing out or throwing some other exception').
+ *
+ * @param postTimeoutOperation1 a SERIAL operation executed after an
initial write that inserts the row [0, 0] times
+ * out. It is executed with a QUORUM of nodes
that have _not_ see the timed out
+ * proposal, and so that operation should
expect that the [0, 0] write has not taken
+ * place.
+ * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_
{@code postTimeoutOperation1}, with no
+ * write executed between the 2 operation.
Contrarily to the 1st operation, the QORUM
+ * for this operation _will_ include the node
that got the proposal for the [0, 0]
+ * insert but didn't participated to {@code
postTimeoutOperation1}}. That operation
+ * should also no witness that [0, 0] write
(since {@code postTimeoutOperation1}
+ * didn't).
+ * @param loseCommitOfOperation1 if {@code true}, the test will also drop
the "commits" messages for
+ * {@code postTimeoutOperation1}. In
general, the test should behave the same with or
+ * without that flag since a value is
decided as soon as it has been "accepted by
+ * quorum" and the commits should always be
properly replayed.
+ */
+ private void consistencyAfterWriteTimeoutTest(BiConsumer<String,
ICoordinator> postTimeoutOperation1,
+ BiConsumer<String,
ICoordinator> postTimeoutOperation2,
+ boolean
loseCommitOfOperation1) throws IOException
+ {
+ try (Cluster cluster = init(Cluster.create(3, config ->
config.set("write_request_timeout_in_ms", 200L)
+
.set("cas_contention_timeout_in_ms", 200L))))
+ {
+ String table = KEYSPACE + ".t";
+ cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY
KEY, v int)");
+
+ // We do a CAS insertion, but have with the PROPOSE message
dropped on node 1 and 2. The CAS will not get
+ // through and should timeout. Importantly, node 3 does receive
and answer the PROPOSE.
+ IMessageFilters.Filter dropProposeFilter = cluster.filters()
+ .inbound()
+
.verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
+ .to(1, 2)
+ .drop();
+ try
+ {
+ // NOTE: the consistency below is the "commit" one, so it
doesn't matter at all here.
+ cluster.coordinator(1)
+ .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0)
IF NOT EXISTS", ConsistencyLevel.ONE);
+ fail("The insertion should have timed-out");
+ }
+ catch (Exception e)
+ {
+ // We expect a write timeout. If we get one, the test can
continue, otherwise, we rethrow. Note that we
+ // look at the root cause because the dtest framework
effectively wrap the exception in a RuntimeException
+ // (we could just look at the immediate cause, but this feel a
bit more resilient this way).
+ // TODO: we can't use an instanceof below because the
WriteTimeoutException we get is from a different class
+ // loader than the one the test run under, and that's our
poor-man work-around. This kind of things should
+ // be improved at the dtest API level.
+ if
(!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
+ throw e;
+ }
+ finally
+ {
+ dropProposeFilter.off();
+ }
+
+ // Isolates node 3 and executes the SERIAL operation. As neither
node 1 or 2 got the initial insert proposal,
+ // there is nothing to "replay" and the operation should assert
the table is still empty.
+ IMessageFilters.Filter ignoreNode3Filter =
cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+ IMessageFilters.Filter dropCommitFilter = null;
+ if (loseCommitOfOperation1)
+ {
+ dropCommitFilter =
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
+ }
+ try
+ {
+ postTimeoutOperation1.accept(table, cluster.coordinator(1));
+ }
+ finally
+ {
+ ignoreNode3Filter.off();
+ if (dropCommitFilter != null)
+ dropCommitFilter.off();
+ }
+
+ // Node 3 is now back and we isolate node 2 to ensure the next
read hits node 1 and 3.
+ // What we want to ensure is that despite node 3 having the
initial insert in its paxos state in a position of
+ // being replayed, that insert is _not_ replayed (it would
contradict serializability since the previous
+ // operation asserted nothing was inserted). It is this execution
that failed before CASSANDRA-12126.
+ IMessageFilters.Filter ignoreNode2Filter =
cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+ try
+ {
+ postTimeoutOperation2.accept(table, cluster.coordinator(1));
+ }
+ finally
+ {
+ ignoreNode2Filter.off();
+ }
+ }
+ }
+
+ /**
+ * Tests that if a write timeouts and a following serial read does not see
that write, then no following reads sees
+ * it, even if some nodes still have the write in their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by
CASSANDRA-12126.
+ */
+ @Test
+ public void readConsistencyAfterWriteTimeoutTest() throws IOException
+ {
+ BiConsumer<String, ICoordinator> operation =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT *
FROM " + table + " WHERE k=0",
+
ConsistencyLevel.SERIAL));
+
+ consistencyAfterWriteTimeoutTest(operation, operation, false);
+ consistencyAfterWriteTimeoutTest(operation, operation, true);
+ }
+
+ /**
+ * Tests that if a write timeouts, then a following CAS succeed but does
not apply in a way that indicate the write
+ * has not applied, then no following CAS can see that initial insert ,
even if some nodes still have the write in
+ * their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by
CASSANDRA-12126.
+ */
+ @Test
+ public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException
+ {
+ // Note: we use CL.ANY so that the operation don't timeout in the case
where we "lost" the operation1 commits.
+ // The commit CL shouldn't have impact on this test anyway, so this
doesn't diminishes the test.
+ BiConsumer<String, ICoordinator> operation =
+ (table, coordinator) ->
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k
= 0 IF v = 0",
+
ConsistencyLevel.ANY));
+ consistencyAfterWriteTimeoutTest(operation, operation, false);
+ consistencyAfterWriteTimeoutTest(operation, operation, true);
+ }
+
+ /**
+ * Tests that if a write timeouts and a following serial read does not see
that write, then no following CAS see
+ * that initial insert, even if some nodes still have the write in their
paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by
CASSANDRA-12126.
+ */
+ @Test
+ public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout()
throws IOException
+ {
+ BiConsumer<String, ICoordinator> operation1 =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT *
FROM " + table + " WHERE k=0",
+
ConsistencyLevel.SERIAL));
+ BiConsumer<String, ICoordinator> operation2 =
+ (table, coordinator) ->
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k
= 0 IF v = 0",
+
ConsistencyLevel.QUORUM));
+ consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+ consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+ }
+
+ /**
+ * Tests that if a write timeouts and a following CAS succeed but does not
apply in a way that indicate the write
+ * has not applied, then following serial reads do no see that write, even
if some nodes still have the write in
+ * their paxos state.
+ *
+ * <p>This specifically test for the inconsistency described/fixed by
CASSANDRA-12126.
+ */
+ @Test
+ public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout()
throws IOException
+ {
+ // Note: we use CL.ANY so that the operation don't timeout in the case
where we "lost" the operation1 commits.
+ // The commit CL shouldn't have impact on this test anyway, so this
doesn't diminishes the test.
+ BiConsumer<String, ICoordinator> operation1 =
+ (table, coordinator) ->
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k
= 0 IF v = 0",
+
ConsistencyLevel.ANY));
+ BiConsumer<String, ICoordinator> operation2 =
+ (table, coordinator) -> assertRows(coordinator.execute("SELECT *
FROM " + table + " WHERE k=0",
+
ConsistencyLevel.SERIAL));
+ consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+ consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+ }
+
+ // TODO: this shoud probably be moved into the dtest API.
+ private void assertCasNotApplied(Object[][] resultSet)
+ {
+ assertFalse("Expected a CAS resultSet (with at least application
result) but got an empty one.",
+ resultSet.length == 0);
+ assertFalse("Invalid empty first row in CAS resultSet.",
resultSet[0].length == 0);
+ Object wasApplied = resultSet[0][0];
+ assertTrue("Expected 1st column of CAS resultSet to be a boolean, but
got a " + wasApplied.getClass(),
+ wasApplied instanceof Boolean);
+ assertFalse("Expected CAS to not be applied, but was applied.",
(Boolean)wasApplied);
+ }
+
+ /**
+ * Failed write (by node that did not yet witness a range movement via
gossip) is witnessed later as successful
+ * conflicting with another successful write performed by a node that did
witness the range movement
+ * Prepare, Propose and Commit A to {1, 2}
+ * Range moves to {2, 3, 4}
+ * Prepare and Propose B (=> !A) to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {1} is unaware (yet) that {4} is an owner of the
token
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3}
=> {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2,
3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ for (int i = 1 ; i <= 3 ; ++i)
+
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {4} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(4).to(2).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+ assertRows(cluster.coordinator(4).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+ }
+ }
+
+ /**
+ * Failed write (by node that did not yet witness a range movement via
gossip) is witnessed later as successful
+ * conflicting with another successful write performed by a node that did
witness the range movement
+ * - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+ * - X: Prepare, Propose and Commit A to {3, 4}
+ * - !X: Prepare and Propose B (=>!A) to {1, 2}
+ */
+ @Ignore
+ @Test
+ public void testConflictingWritesWithStaleRingInformation() throws
Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {1} is unaware (yet) that {4} is an owner of the
token
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
+ // {4} promises, accepts and commits on !{2} => {3, 4}
+ int pk = pk(cluster, 1, 2);
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(4).to(2).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
+ assertRows(cluster.coordinator(4).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // {1} promises, accepts and commmits on !{3} => {1, 2}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+ }
+ }
+
+ /**
+ * Successful write during range movement, not witnessed by read after
range movement.
+ * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range movement witnessed by !X
+ * - Any: Prepare and Read from {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws
Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to
other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3}
=> {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2,
3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+ row(pk, 1, 1));
+ }
+ }
+
+ /**
+ * Successful write during range movement not witnessed by write after
range movement
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range movement witnessed by !X
+ * - Any: Prepare and Propose to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting()
throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to
other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3}
=> {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2,
3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(false, pk, 1, 1, null));
+
+ // TODO: repair and verify base table state
+ }
+ }
+
+ /**
+ * During a range movement, a CAS may fail leaving side effects that are
not witnessed by another operation
+ * being performed with stale ring information.
+ * This is a particular special case of stale ring information sequencing,
which probably would be resolved
+ * by fixing each of the more isolated cases (but is unique, so deserving
of its own test case).
+ * See CASSANDRA-15745
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - X: Prepare to {2, 3, 4}
+ * - X: Propose to {4}
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range move visible by !X
+ * - Any: Prepare and Read from {3, 4}
+ */
+ @Ignore
+ @Test
+ public void
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead()
throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to
other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1,
2, 3} => {4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(4).to(1).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2,
3).drop();
+ try
+ {
+ cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM,
pk);
+ Assert.assertTrue(false);
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1
responses.", wrapped.getCause().getMessage());
+ }
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3}
=> {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2,
3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(3).to(2).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+ row(pk, 1, null, 2));
+ }
+ }
+
+ /**
+ * During a range movement, a CAS may fail leaving side effects that are
not witnessed by another operation
+ * being performed with stale ring information.
+ * This is a particular special case of stale ring information sequencing,
which probably would be resolved
+ * by fixing each of the more isolated cases (but is unique, so deserving
of its own test case).
+ * See CASSANDRA-15745
+ *
+ * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+ * - X: Prepare to {2, 3, 4}
+ * - X: Propose to {4}
+ * - !X: Prepare and Propose to {1, 2}
+ * - Range move visible by !X
+ * - Any: Prepare and Propose to {3, 4}
+ */
+ @Ignore
+ @Test
+ public void
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite()
throws Throwable
+ {
+ try (Cluster cluster = Cluster.create(4, config -> config
+ .set("write_request_timeout_in_ms", 200L)
+ .set("cas_contention_timeout_in_ms", 200L)))
+ {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+
+ // make it so {4} is bootstrapping, and this has not propagated to
other nodes yet
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+
+ int pk = pk(cluster, 1, 2);
+
+ // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1,
2, 3} => {4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(4).to(1).drop();
+ cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2,
3).drop();
+ try
+ {
+ cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE +
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM,
pk);
+ Assert.assertTrue(false);
+ }
+ catch (RuntimeException wrapped)
+ {
+ Assert.assertEquals("Operation timed out - received only 1
responses.", wrapped.getCause().getMessage());
+ }
+
+ // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3}
=> {1}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(1).to(3).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+ cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2,
3).drop();
+ assertRows(cluster.coordinator(1).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(true));
+
+ // finish topology change
+ for (int i = 1 ; i <= 4 ; ++i)
+
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+
+ // {3} reads from !{2} => {3, 4}
+ cluster.filters().verbs(PAXOS_PREPARE.ordinal(),
READ.ordinal()).from(3).to(2).drop();
+
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+ assertRows(cluster.coordinator(3).execute("INSERT INTO " +
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS",
ConsistencyLevel.ONE, pk),
+ row(false, 5, 1, null, 2));
+ }
+ }
+
+ private static int pk(Cluster cluster, int lb, int ub)
+ {
+ return pk(cluster.get(lb), cluster.get(ub));
+ }
+
+ private static int pk(IInstance lb, IInstance ub)
+ {
+ return
pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+
Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+ }
+
+ private static int pk(Token lb, Token ub)
+ {
+ int pk = 0;
+ Token pkt;
+ while (lb.compareTo(pkt =
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 ||
ub.compareTo(pkt) < 0)
+ ++pk;
+ return pk;
+ }
+
+ private static void debugOwnership(Cluster cluster, int pk)
+ {
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ System.out.println(i + ": " +
cluster.get(i).appliesOnInstance((Integer v) ->
StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE,
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
+ .apply(pk));
+ }
+
+ private static void debugPaxosState(Cluster cluster, int pk)
+ {
+ UUID cfid = cluster.get(1).callOnInstance(() ->
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
+ for (int i = 1 ; i <= cluster.size() ; ++i)
+ for (Object[] row : cluster.get(i).executeInternal("select
in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos
where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
+ System.out.println(i + ": " + (row[0] == null ? 0L :
UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L :
UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L :
UUIDGen.microsTimestamp((UUID)row[2])));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]