This is an automated email from the ASF dual-hosted git repository.

slebresne pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 080280dc0177da6176dd4ba970e5a35aa7e2a729
Merge: 7ffb2fe 2d0b168
Author: Sylvain Lebresne <lebre...@gmail.com>
AuthorDate: Fri Nov 27 17:05:48 2020 +0100

    Merge commit '2d0b16804785660e8515aca9944784fb3733c619' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   8 +
 .../org/apache/cassandra/service/StorageProxy.java | 239 +++++--
 .../org/apache/cassandra/service/paxos/Commit.java |   6 +
 .../apache/cassandra/service/paxos/PaxosState.java |   3 -
 .../cassandra/service/paxos/PrepareCallback.java   |  12 +-
 .../cassandra/distributed/impl/Instance.java       | 113 ++--
 .../apache/cassandra/distributed/test/CASTest.java | 684 +++++++++++++++++++++
 8 files changed, 949 insertions(+), 117 deletions(-)

diff --cc CHANGES.txt
index dd554c2,d6f406d..c3c5f02
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,7 -1,5 +1,8 @@@
 -3.0.24:
 +3.11.10
 + * Rate limit validation compactions using compaction_throughput_mb_per_sec 
(CASSANDRA-16161)
 + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 +Merged from 3.0:
+  * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126)
   * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294)
   * Improved check of num_tokens against the length of initial_token 
(CASSANDRA-14477)
   * Fix a race condition on ColumnFamilyStore and TableMetrics 
(CASSANDRA-16228)
diff --cc NEWS.txt
index 3af2150,6cc5e84..c5a3439
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -42,10 -42,19 +42,18 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
 -3.0.24
 -======
 -
 +3.11.10
 +=====
  Upgrading
  ---------
+     - This release fix a correctness issue with SERIAL reads, and LWT writes 
that do not apply.
+       Unfortunately, this fix has a performance impact on read performance at 
the SERIAL or
+       LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, 
the performance
+       impact may be noticeable and may also result in an increased of 
timeouts. For that
+       reason, a opt-in system property has been added to disable the fix:
+         -Dcassandra.unsafe.disable-serial-reads-linearizability=true
+       Use this flag at your own risk as it revert SERIAL reads to the 
incorrect behavior of
+       previous versions. See CASSANDRA-12126 for details.
      - In cassandra.yaml, when using vnodes num_tokens must be defined if 
initial_token is defined.
        If it is not defined, or not equal to the numbers of tokens defined in 
initial_tokens,
        the node will not start. See CASSANDRA-14477 for details.
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index b1e0696,91dd991..d6f713e
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -169,11 -169,15 +174,21 @@@ public class StorageProxy implements St
              }
          };
  
 +        for(ConsistencyLevel level : ConsistencyLevel.values())
 +        {
 +            readMetricsMap.put(level, new ClientRequestMetrics("Read-" + 
level.name()));
 +            writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + 
level.name()));
 +        }
++
+         if (disableSerialReadLinearizability)
+         {
+             logger.warn("This node was started with -D{}. SERIAL (and 
LOCAL_SERIAL) reads coordinated by this node " +
+                         "will not offer linearizability (see CASSANDRA-12126 
for details on what this mean) with " +
+                         "respect to other SERIAL operations. Please note 
that, with this flag, SERIAL reads will be " +
+                         "slower than QUORUM reads, yet offer no more 
guarantee. This flag should only be used in " +
+                         "the restricted case of upgrading from a 
pre-CASSANDRA-12126 version, and only if you " +
+                         "understand the tradeoff.", 
DISABLE_SERIAL_READ_LINEARIZABILITY_KEY);
+         }
      }
  
      /**
@@@ -223,31 -227,16 +238,17 @@@
                                    CASRequest request,
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit,
 -                                  ClientState state)
 +                                  ClientState state,
 +                                  long queryStartNanoTime)
      throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
      {
 -        final long start = System.nanoTime();
 +        final long startTimeForMetrics = System.nanoTime();
-         int contentions = 0;
          try
          {
-             consistencyForPaxos.validateForCas();
-             consistencyForCommit.validateForCasCommit(keyspaceName);
- 
              CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
  
-             long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-             while (System.nanoTime() - queryStartNanoTime < timeout)
+             Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () 
->
              {
-                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
-                 Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
-                 List<InetAddress> liveEndpoints = p.left;
-                 int requiredParticipants = p.right;
- 
-                 final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, 
requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
-                 final UUID ballot = pair.left;
-                 contentions += pair.right;
- 
                  // read the current values and check they validate the 
conditions
                  Tracing.trace("Reading existing values for CAS precondition");
                  SinglePartitionReadCommand readCommand = 
request.readCommand(FBUtilities.nowInSeconds());
@@@ -279,55 -267,139 +279,151 @@@
                  // InvalidRequestException) any which aren't.
                  updates = TriggerExecutor.instance.execute(updates);
  
+                 return Pair.create(updates, null);
+             };
  
-                 Commit proposal = Commit.newProposal(ballot, updates);
-                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
-                 if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
-                 {
-                     commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
-                     Tracing.trace("CAS successful");
-                     return null;
-                 }
- 
-                 Tracing.trace("Paxos proposal not accepted (pre-empted by a 
higher ballot)");
-                 contentions++;
-                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
-                 // continue to retry
-             }
+             return doPaxos(metadata,
+                            key,
+                            consistencyForPaxos,
+                            consistencyForCommit,
+                            consistencyForCommit,
+                            state,
 -                           start,
++                           queryStartNanoTime,
+                            casWriteMetrics,
+                            updateProposer);
  
-             throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
          }
-         catch (WriteTimeoutException|ReadTimeoutException e)
+         catch (WriteTimeoutException | ReadTimeoutException e)
          {
              casWriteMetrics.timeouts.mark();
 +            writeMetricsMap.get(consistencyForPaxos).timeouts.mark();
              throw e;
          }
-         catch (WriteFailureException|ReadFailureException e)
+         catch (WriteFailureException | ReadFailureException e)
          {
              casWriteMetrics.failures.mark();
 +            writeMetricsMap.get(consistencyForPaxos).failures.mark();
              throw e;
          }
-         catch(UnavailableException e)
+         catch (UnavailableException e)
          {
              casWriteMetrics.unavailables.mark();
 +            writeMetricsMap.get(consistencyForPaxos).unavailables.mark();
              throw e;
          }
          finally
          {
-             recordCasContention(contentions);
 -            casWriteMetrics.addNano(System.nanoTime() - start);
 +            final long latency = System.nanoTime() - startTimeForMetrics;
 +            casWriteMetrics.addNano(latency);
 +            writeMetricsMap.get(consistencyForPaxos).addNano(latency);
          }
      }
  
-     private static void recordCasContention(int contentions)
++    private static void recordCasContention(CASClientRequestMetrics 
casMetrics, int contentions)
 +    {
 +        if(contentions > 0)
-             casWriteMetrics.contention.update(contentions);
++            casMetrics.contention.update(contentions);
++    }
++
+     /**
+      * Performs the Paxos rounds for a given proposal, retrying when 
preempted until the timeout.
+      *
+      * <p>The main 'configurable' of this method is the {@code 
createUpdateProposal} method: it is called by the method
+      * once a ballot has been successfully 'prepared' to generate the update 
to 'propose' (and commit if the proposal is
+      * successful). That method also generates the result that the whole 
method will return. Note that due to retrying,
+      * this method may be called multiple times and does not have to return 
the same results.
+      *
+      * @param metadata the table to update with Paxos.
+      * @param key the partition updated.
+      * @param consistencyForPaxos the serial consistency of the operation 
(either {@link ConsistencyLevel#SERIAL} or
+      *     {@link ConsistencyLevel#LOCAL_SERIAL}).
+      * @param consistencyForReplayCommits the consistency for the commit 
phase of "replayed" in-progress operations.
+      * @param consistencyForCommit the consistency for the commit phase of 
_this_ operation update.
+      * @param state the client state.
 -     * @param queryStartNanoTime the nano time for the start of the query 
this is part of.
++     * @param queryStartNanoTime the nano time for the start of the query 
this is part of. This is the base time for
++     *     timeouts.
+      * @param casMetrics the metrics to update for this operation.
+      * @param createUpdateProposal method called after a successful 'prepare' 
phase to obtain 1) the actual update of
+      *     this operation and 2) the result that the whole method should 
return. This can return {@code null} in the
+      *     special where, after having "prepared" (and thus potentially 
replayed in-progress upgdates), we don't want
+      *     to propose anything (the whole method then return {@code null}).
+      * @return the second element of the pair returned by {@code 
createUpdateProposal} (for the last call of that method
+      *     if that method is called multiple times due to retries).
+      */
+     private static RowIterator doPaxos(CFMetaData metadata,
+                                        DecoratedKey key,
+                                        ConsistencyLevel consistencyForPaxos,
+                                        ConsistencyLevel 
consistencyForReplayCommits,
+                                        ConsistencyLevel consistencyForCommit,
+                                        ClientState state,
+                                        long queryStartNanoTime,
+                                        CASClientRequestMetrics casMetrics,
+                                        Supplier<Pair<PartitionUpdate, 
RowIterator>> createUpdateProposal)
+     throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
+     {
+         int contentions = 0;
+         try
+         {
+             consistencyForPaxos.validateForCas();
+             consistencyForReplayCommits.validateForCasCommit(metadata.ksName);
+             consistencyForCommit.validateForCasCommit(metadata.ksName);
+ 
+             long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
+             while (System.nanoTime() - queryStartNanoTime < timeout)
+             {
+                 // for simplicity, we'll do a single liveness check at the 
start of each attempt
+                 Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyForPaxos);
+                 List<InetAddress> liveEndpoints = p.left;
+                 int requiredParticipants = p.right;
+ 
+                 final Pair<UUID, Integer> pair = 
beginAndRepairPaxos(queryStartNanoTime,
+                                                                      key,
+                                                                      metadata,
+                                                                      
liveEndpoints,
+                                                                      
requiredParticipants,
+                                                                      
consistencyForPaxos,
+                                                                      
consistencyForReplayCommits,
+                                                                      
casMetrics,
+                                                                      state);
+                 final UUID ballot = pair.left;
+                 contentions += pair.right;
+ 
+                 Pair<PartitionUpdate, RowIterator> proposalPair = 
createUpdateProposal.get();
+                 // See method javadoc: null here is code for "stop here and 
return null".
+                 if (proposalPair == null)
+                     return null;
+ 
+                 Commit proposal = Commit.newProposal(ballot, 
proposalPair.left);
+                 Tracing.trace("CAS precondition is met; proposing 
client-requested updates for {}", ballot);
 -                if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos))
++                if (proposePaxos(proposal, liveEndpoints, 
requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
+                 {
+                     // We skip committing accepted updates when they are 
empty. This is an optimization which works
+                     // because we also skip replaying those same empty update 
in beginAndRepairPaxos (see the longer
+                     // comment there). As empty update are somewhat common 
(serial reads and non-applying CAS propose
+                     // them), this is worth bothering.
+                     if (!proposal.update.isEmpty())
 -                        commitPaxos(proposal, consistencyForCommit, true);
++                        commitPaxos(proposal, consistencyForCommit, true, 
queryStartNanoTime);
+                     RowIterator result = proposalPair.right;
+                     if (result != null)
+                         Tracing.trace("CAS did not apply");
+                     else
+                         Tracing.trace("CAS applied successfully");
+                     return result;
+                 }
+ 
+                 Tracing.trace("Paxos proposal not accepted (pre-empted by a 
higher ballot)");
+                 contentions++;
+                 
Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), 
TimeUnit.MILLISECONDS);
+                 // continue to retry
+             }
+ 
+             throw new WriteTimeoutException(WriteType.CAS, 
consistencyForPaxos, 0, 
consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
+         }
+         finally
+         {
+             if(contentions > 0)
+                 casMetrics.contention.update(contentions);
+         }
      }
  
      private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
@@@ -427,12 -515,9 +539,9 @@@
              if (!inProgress.update.isEmpty() && 
inProgress.isAfter(mostRecent))
              {
                  Tracing.trace("Finishing incomplete paxos round {}", 
inProgress);
-                 if(isWrite)
-                     casWriteMetrics.unfinishedCommit.inc();
-                 else
-                     casReadMetrics.unfinishedCommit.inc();
+                 casMetrics.unfinishedCommit.inc();
                  Commit refreshedInProgress = Commit.newProposal(ballot, 
inProgress.update);
 -                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false, consistencyForPaxos))
 +                if (proposePaxos(refreshedInProgress, liveEndpoints, 
requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
                  {
                      try
                      {
@@@ -440,7 -525,6 +549,7 @@@
                      }
                      catch (WriteTimeoutException e)
                      {
-                         recordCasContention(contentions);
++                        recordCasContention(casMetrics, contentions);
                          // We're still doing preparation for the paxos 
rounds, so we want to use the CAS (see CASSANDRA-8672)
                          throw new WriteTimeoutException(WriteType.CAS, 
e.consistency, e.received, e.blockFor);
                      }
@@@ -475,7 -559,6 +584,7 @@@
              return Pair.create(ballot, contentions);
          }
  
-         recordCasContention(contentions);
++        recordCasContention(casMetrics, contentions);
          throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName)));
      }
  
@@@ -1611,21 -1642,31 +1720,31 @@@
          PartitionIterator result = null;
          try
          {
-             // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
-             Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(metadata, key, consistencyLevel);
-             List<InetAddress> liveEndpoints = p.left;
-             int requiredParticipants = p.right;
- 
-             // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
-             final ConsistencyLevel consistencyForCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
-                                                                               
     ? ConsistencyLevel.LOCAL_QUORUM
-                                                                               
     : ConsistencyLevel.QUORUM;
 -            final ConsistencyLevel consistencyForReplayCommitOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
 -                                                                       ? 
ConsistencyLevel.LOCAL_QUORUM
 -                                                                       : 
ConsistencyLevel.QUORUM;
++            final ConsistencyLevel consistencyForReplayCommitsOrFetch = 
consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
++                                                                        ? 
ConsistencyLevel.LOCAL_QUORUM
++                                                                        : 
ConsistencyLevel.QUORUM;
  
              try
              {
-                 final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, 
key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, 
consistencyForCommitOrFetch, false, state);
-                 if (pair.right > 0)
-                     casReadMetrics.contention.update(pair.right);
+                 // Commit an empty update to make sure all in-progress 
updates that should be finished first is, _and_
+                 // that no other in-progress can get resurrected.
+                 Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer =
+                     disableSerialReadLinearizability
+                     ? () -> null
+                     : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, 
key), null);
+                 // When replaying, we commit at quorum/local quorum, as we 
want to be sure the following read (done at
+                 // quorum/local_quorum) sees any replayed updates. Our own 
update is however empty, and those don't even
+                 // get committed due to an optimiation described in 
doPaxos/beingRepairAndPaxos, so the commit
+                 // consistency is irrelevant (we use ANY just to emphasis 
that we don't wait on our commit).
+                 doPaxos(metadata,
+                         key,
+                         consistencyLevel,
 -                        consistencyForReplayCommitOrFetch,
++                        consistencyForReplayCommitsOrFetch,
+                         ConsistencyLevel.ANY,
+                         state,
+                         start,
+                         casReadMetrics,
+                         updateProposer);
              }
              catch (WriteTimeoutException e)
              {
@@@ -1633,10 -1674,10 +1752,10 @@@
              }
              catch (WriteFailureException e)
              {
 -                throw new ReadFailureException(consistencyLevel, e.received, 
e.failures, e.blockFor, false);
 +                throw new ReadFailureException(consistencyLevel, e.received, 
e.blockFor, false, e.failureReasonByEndpoint);
              }
  
-             result = fetchRows(group.commands, consistencyForCommitOrFetch, 
queryStartNanoTime);
 -            result = fetchRows(group.commands, 
consistencyForReplayCommitOrFetch);
++            result = fetchRows(group.commands, 
consistencyForReplayCommitsOrFetch, queryStartNanoTime);
          }
          catch (UnavailableException e)
          {
diff --cc test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 0000000,0b1dce6..473f56c
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@@ -1,0 -1,679 +1,684 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.UUID;
+ import java.util.function.BiConsumer;
+ 
+ import org.junit.Assert;
+ import org.junit.Ignore;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
+ import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.ICoordinator;
+ import org.apache.cassandra.distributed.api.IInstance;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.fail;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+ import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT;
+ import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE;
+ import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE;
+ import static org.apache.cassandra.net.MessagingService.Verb.READ;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class CASTest extends TestBaseImpl
+ {
+     @Test
+     public void simpleUpdate() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3)))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 1));
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompletePrepare() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             IMessageFilters.Filter drop = 
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+             drop.off();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL));
+         }
+     }
+ 
+     @Test
+     public void incompletePropose() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+             drop1.off();
+             // make sure we encounter one of the in-progress proposals so we 
complete it
+             
cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     @Test
+     public void incompleteCommit() throws Throwable
+     {
+         try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 
200L).set("cas_contention_timeout_in_ms", 200L))))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             IMessageFilters.Filter drop1 = 
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop();
+             try
+             {
+                 cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
+                 Assert.fail();
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+             drop1.off();
+             // make sure we see one of the successful commits
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop();
+             cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v 
= 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM);
+             assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL),
+                     row(1, 1, 2));
+         }
+     }
+ 
+     private int[] paxosAndReadVerbs() {
+         return new int[] {
+             MessagingService.Verb.PAXOS_PREPARE.ordinal(),
+             MessagingService.Verb.PAXOS_PROPOSE.ordinal(),
+             MessagingService.Verb.PAXOS_COMMIT.ordinal(),
+             MessagingService.Verb.READ.ordinal()
+         };
+     }
+ 
+     /**
+      * Base test to ensure that if a write times out but with a proposal 
accepted by some nodes (less then quorum), and
+      * a following SERIAL operation does not observe that write (the node 
having accepted it do not participate in that
+      * following operation), then that write is never applied, even when the 
nodes having accepted the original proposal
+      * participate.
+      *
+      * <p>In other words, if an operation timeout, it may or may not be 
applied, but that "fate" is persistently decided
+      * by the very SERIAL operation that "succeed" (in the sense of 'not 
timing out or throwing some other exception').
+      *
+      * @param postTimeoutOperation1 a SERIAL operation executed after an 
initial write that inserts the row [0, 0] times
+      *                              out. It is executed with a QUORUM of 
nodes that have _not_ see the timed out
+      *                              proposal, and so that operation should 
expect that the [0, 0] write has not taken
+      *                              place.
+      * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ 
{@code postTimeoutOperation1}, with no
+      *                              write executed between the 2 operation. 
Contrarily to the 1st operation, the QORUM
+      *                              for this operation _will_ include the 
node that got the proposal for the [0, 0]
+      *                              insert but didn't participated to {@code 
postTimeoutOperation1}}. That operation
+      *                              should also no witness that [0, 0] write 
(since {@code postTimeoutOperation1}
+      *                              didn't).
+      * @param loseCommitOfOperation1 if {@code true}, the test will also drop 
the "commits" messages for
+      *                               {@code postTimeoutOperation1}. In 
general, the test should behave the same with or
+      *                               without that flag since a value is 
decided as soon as it has been "accepted by
+      *                               quorum" and the commits should always be 
properly replayed.
+      */
+     private void consistencyAfterWriteTimeoutTest(BiConsumer<String, 
ICoordinator> postTimeoutOperation1,
+                                                   BiConsumer<String, 
ICoordinator> postTimeoutOperation2,
+                                                   boolean 
loseCommitOfOperation1) throws IOException
+     {
 -        try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 200L)
++        // It's unclear why (haven't dug), but in some of the instance of 
this test method, there is a consistent 2+
++        // seconds pauses between the prepare and propose phases during the 
execution of 'postTimeoutOperation2'. This
++        // does not happen on 3.0 and there is no report of such long pauses 
otherwise, so an hypothesis is that this
++        // is due to the in-jvm dtest framework. This is is why we use a 4 
seconds timeout here. Given this test is
++        // not about performance, this is probably ok, even if we ideally 
should dug into the underlying reason.
++        try (Cluster cluster = init(Cluster.create(3, config -> 
config.set("write_request_timeout_in_ms", 4000L)
+                                                                       
.set("cas_contention_timeout_in_ms", 200L))))
+         {
+             String table = KEYSPACE + ".t";
+             cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY 
KEY, v int)");
+ 
+             // We do a CAS insertion, but have with the PROPOSE message 
dropped on node 1 and 2. The CAS will not get
+             // through and should timeout. Importantly, node 3 does receive 
and answer the PROPOSE.
+             IMessageFilters.Filter dropProposeFilter = cluster.filters()
+                                                               .inbound()
+                                                               
.verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal())
+                                                               .to(1, 2)
+                                                               .drop();
+             try
+             {
+                 // NOTE: the consistency below is the "commit" one, so it 
doesn't matter at all here.
+                 cluster.coordinator(1)
+                        .execute("INSERT INTO " + table + "(k, v) VALUES (0, 
0) IF NOT EXISTS", ConsistencyLevel.ONE);
+                 fail("The insertion should have timed-out");
+             }
+             catch (Exception e)
+             {
+                 // We expect a write timeout. If we get one, the test can 
continue, otherwise, we rethrow. Note that we
+                 // look at the root cause because the dtest framework 
effectively wrap the exception in a RuntimeException
+                 // (we could just look at the immediate cause, but this feel 
a bit more resilient this way).
+                 // TODO: we can't use an instanceof below because the 
WriteTimeoutException we get is from a different class
+                 //  loader than the one the test run under, and that's our 
poor-man work-around. This kind of things should
+                 //  be improved at the dtest API level.
+                 if 
(!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException"))
+                     throw e;
+             }
+             finally
+             {
+                 dropProposeFilter.off();
+             }
+ 
+             // Isolates node 3 and executes the SERIAL operation. As neither 
node 1 or 2 got the initial insert proposal,
+             // there is nothing to "replay" and the operation should assert 
the table is still empty.
+             IMessageFilters.Filter ignoreNode3Filter = 
cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop();
+             IMessageFilters.Filter dropCommitFilter = null;
+             if (loseCommitOfOperation1)
+             {
+                 dropCommitFilter = 
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop();
+             }
+             try
+             {
+                 postTimeoutOperation1.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode3Filter.off();
+                 if (dropCommitFilter != null)
+                     dropCommitFilter.off();
+             }
+ 
+             // Node 3 is now back and we isolate node 2 to ensure the next 
read hits node 1 and 3.
+             // What we want to ensure is that despite node 3 having the 
initial insert in its paxos state in a position of
+             // being replayed, that insert is _not_ replayed (it would 
contradict serializability since the previous
+             // operation asserted nothing was inserted). It is this execution 
that failed before CASSANDRA-12126.
+             IMessageFilters.Filter ignoreNode2Filter = 
cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop();
+             try
+             {
+                 postTimeoutOperation2.accept(table, cluster.coordinator(1));
+             }
+             finally
+             {
+                 ignoreNode2Filter.off();
+             }
+         }
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not 
see that write, then no following reads sees
+      * it, even if some nodes still have the write in their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void readConsistencyAfterWriteTimeoutTest() throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+ 
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts, then a following CAS succeed but does 
not apply in a way that indicate the write
+      * has not applied, then no following CAS can see that initial insert , 
even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void nonApplyingCasConsistencyAfterWriteTimeout() throws 
IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the 
case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this 
doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.ANY));
+         consistencyAfterWriteTimeoutTest(operation, operation, false);
+         consistencyAfterWriteTimeoutTest(operation, operation, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following serial read does not 
see that write, then no following CAS see
+      * that initial insert, even if some nodes still have the write in their 
paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() 
throws IOException
+     {
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.QUORUM));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     /**
+      * Tests that if a write timeouts and a following CAS succeed but does 
not apply in a way that indicate the write
+      * has not applied, then following serial reads do no see that write, 
even if some nodes still have the write in
+      * their paxos state.
+      *
+      * <p>This specifically test for the inconsistency described/fixed by 
CASSANDRA-12126.
+      */
+     @Test
+     public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() 
throws IOException
+     {
+         // Note: we use CL.ANY so that the operation don't timeout in the 
case where we "lost" the operation1 commits.
+         // The commit CL shouldn't have impact on this test anyway, so this 
doesn't diminishes the test.
+         BiConsumer<String, ICoordinator> operation1 =
+             (table, coordinator) -> 
assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k 
= 0 IF v = 0",
+                                                                             
ConsistencyLevel.ANY));
+         BiConsumer<String, ICoordinator> operation2 =
+             (table, coordinator) -> assertRows(coordinator.execute("SELECT * 
FROM " + table + " WHERE k=0",
+                                                                    
ConsistencyLevel.SERIAL));
+         consistencyAfterWriteTimeoutTest(operation1, operation2, false);
+         consistencyAfterWriteTimeoutTest(operation1, operation2, true);
+     }
+ 
+     // TODO: this shoud probably be moved into the dtest API.
+     private void assertCasNotApplied(Object[][] resultSet)
+     {
+         assertFalse("Expected a CAS resultSet (with at least application 
result) but got an empty one.",
+                     resultSet.length == 0);
+         assertFalse("Invalid empty first row in CAS resultSet.", 
resultSet[0].length == 0);
+         Object wasApplied = resultSet[0][0];
+         assertTrue("Expected 1st column of CAS resultSet to be a boolean, but 
got a " + wasApplied.getClass(),
+                    wasApplied instanceof Boolean);
+         assertFalse("Expected CAS to not be applied, but was applied.", 
(Boolean)wasApplied);
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via 
gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did 
witness the range movement
+      * Prepare, Propose and Commit A to {1, 2}
+      * Range moves to {2, 3, 4}
+      * Prepare and Propose B (=> !A) to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteBeforeRangeMovement() throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the 
token
+             
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} 
=> {1}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             for (int i = 1 ; i <= 3 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {4} reads from !{2} => {3, 4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(2).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Failed write (by node that did not yet witness a range movement via 
gossip) is witnessed later as successful
+      * conflicting with another successful write performed by a node that did 
witness the range movement
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X)
+      *  -  X: Prepare, Propose and Commit A to {3, 4}
+      *  - !X: Prepare and Propose B (=>!A) to {1, 2}
+      */
+     @Ignore
+     @Test
+     public void testConflictingWritesWithStaleRingInformation() throws 
Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {1} is unaware (yet) that {4} is an owner of the 
token
+             
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+ 
+             // {4} promises, accepts and commits on !{2} => {3, 4}
+             int pk = pk(cluster, 1, 2);
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(2).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop();
+             
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop();
+             assertRows(cluster.coordinator(4).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // {1} promises, accepts and commmits on !{3} => {1, 2}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement, not witnessed by read after 
range movement.
+      * Very similar to {@link #testConflictingWritesWithStaleRingInformation}.
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSucccessfulWriteDuringRangeMovementFollowedByRead() 
throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 
3} => {1}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, 1));
+         }
+     }
+ 
+     /**
+      * Successful write during range movement not witnessed by write after 
range movement
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range movement witnessed by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() 
throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, pk, 1, 1, null));
+ 
+             // TODO: repair and verify base table state
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are 
not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information 
sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving 
of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Read from {3, 4}
+      */
+     @Ignore
+     @Test
+     public void 
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead()
 throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 
2, 3} => {4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(1).drop();
+             cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 
3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, 
pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk),
+                     row(pk, 1, null, 2));
+         }
+     }
+ 
+     /**
+      * During a range movement, a CAS may fail leaving side effects that are 
not witnessed by another operation
+      * being performed with stale ring information.
+      * This is a particular special case of stale ring information 
sequencing, which probably would be resolved
+      * by fixing each of the more isolated cases (but is unique, so deserving 
of its own test case).
+      * See CASSANDRA-15745
+      *
+      *  - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X)
+      *  -   X: Prepare to {2, 3, 4}
+      *  -   X: Propose to {4}
+      *  -  !X: Prepare and Propose to {1, 2}
+      *  - Range move visible by !X
+      *  - Any: Prepare and Propose to {3, 4}
+      */
+     @Ignore
+     @Test
+     public void 
testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite()
 throws Throwable
+     {
+         try (Cluster cluster = Cluster.create(4, config -> config
+                 .set("write_request_timeout_in_ms", 200L)
+                 .set("cas_contention_timeout_in_ms", 200L)))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
+ 
+             // make it so {4} is bootstrapping, and this has not propagated 
to other nodes yet
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4));
+             
cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4));
+ 
+             int pk = pk(cluster, 1, 2);
+ 
+             // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 
2, 3} => {4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(4).to(1).drop();
+             cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 
3).drop();
+             try
+             {
+                 cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, 
pk);
+                 Assert.assertTrue(false);
+             }
+             catch (RuntimeException wrapped)
+             {
+                 Assert.assertEquals("Operation timed out - received only 1 
responses.", wrapped.getCause().getMessage());
+             }
+ 
+             // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} 
=> {1}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(1).to(3).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop();
+             cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 
3).drop();
+             assertRows(cluster.coordinator(1).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(true));
+ 
+             // finish topology change
+             for (int i = 1 ; i <= 4 ; ++i)
+                 
cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4));
+ 
+             // {3} reads from !{2} => {3, 4}
+             cluster.filters().verbs(PAXOS_PREPARE.ordinal(), 
READ.ordinal()).from(3).to(2).drop();
+             
cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop();
+             assertRows(cluster.coordinator(3).execute("INSERT INTO " + 
KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", 
ConsistencyLevel.ONE, pk),
+                     row(false, 5, 1, null, 2));
+         }
+     }
+ 
+     private static int pk(Cluster cluster, int lb, int ub)
+     {
+         return pk(cluster.get(lb), cluster.get(ub));
+     }
+ 
+     private static int pk(IInstance lb, IInstance ub)
+     {
+         return 
pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")),
+                 
Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token")));
+     }
+ 
+     private static int pk(Token lb, Token ub)
+     {
+         int pk = 0;
+         Token pkt;
+         while (lb.compareTo(pkt = 
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || 
ub.compareTo(pkt) < 0)
+             ++pk;
+         return pk;
+     }
+ 
+     private static void debugOwnership(Cluster cluster, int pk)
+     {
+         for (int i = 1 ; i <= cluster.size() ; ++i)
+             System.out.println(i + ": " + 
cluster.get(i).appliesOnInstance((Integer v) -> 
StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, 
Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v))))
+                     .apply(pk));
+     }
+ 
+     private static void debugPaxosState(Cluster cluster, int pk)
+     {
+         UUID cfid = cluster.get(1).callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId);
+         for (int i = 1 ; i <= cluster.size() ; ++i)
+             for (Object[] row : cluster.get(i).executeInternal("select 
in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos 
where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid))
+                 System.out.println(i + ": " + (row[0] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : 
UUIDGen.microsTimestamp((UUID)row[2])));
+     }
+ 
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to