This is an automated email from the ASF dual-hosted git repository. slebresne pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 080280dc0177da6176dd4ba970e5a35aa7e2a729 Merge: 7ffb2fe 2d0b168 Author: Sylvain Lebresne <lebre...@gmail.com> AuthorDate: Fri Nov 27 17:05:48 2020 +0100 Merge commit '2d0b16804785660e8515aca9944784fb3733c619' into cassandra-3.11 CHANGES.txt | 1 + NEWS.txt | 8 + .../org/apache/cassandra/service/StorageProxy.java | 239 +++++-- .../org/apache/cassandra/service/paxos/Commit.java | 6 + .../apache/cassandra/service/paxos/PaxosState.java | 3 - .../cassandra/service/paxos/PrepareCallback.java | 12 +- .../cassandra/distributed/impl/Instance.java | 113 ++-- .../apache/cassandra/distributed/test/CASTest.java | 684 +++++++++++++++++++++ 8 files changed, 949 insertions(+), 117 deletions(-) diff --cc CHANGES.txt index dd554c2,d6f406d..c3c5f02 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,7 -1,5 +1,8 @@@ -3.0.24: +3.11.10 + * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) +Merged from 3.0: + * Fix serial read/non-applying CAS linearizability (CASSANDRA-12126) * Avoid potential NPE in JVMStabilityInspector (CASSANDRA-16294) * Improved check of num_tokens against the length of initial_token (CASSANDRA-14477) * Fix a race condition on ColumnFamilyStore and TableMetrics (CASSANDRA-16228) diff --cc NEWS.txt index 3af2150,6cc5e84..c5a3439 --- a/NEWS.txt +++ b/NEWS.txt @@@ -42,10 -42,19 +42,18 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. -3.0.24 -====== - +3.11.10 +===== Upgrading --------- + - This release fix a correctness issue with SERIAL reads, and LWT writes that do not apply. + Unfortunately, this fix has a performance impact on read performance at the SERIAL or + LOCAL_SERIAL consistency levels. For heavy users of such SERIAL reads, the performance + impact may be noticeable and may also result in an increased of timeouts. For that + reason, a opt-in system property has been added to disable the fix: + -Dcassandra.unsafe.disable-serial-reads-linearizability=true + Use this flag at your own risk as it revert SERIAL reads to the incorrect behavior of + previous versions. See CASSANDRA-12126 for details. - In cassandra.yaml, when using vnodes num_tokens must be defined if initial_token is defined. If it is not defined, or not equal to the numbers of tokens defined in initial_tokens, the node will not start. See CASSANDRA-14477 for details. diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index b1e0696,91dd991..d6f713e --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -169,11 -169,15 +174,21 @@@ public class StorageProxy implements St } }; + for(ConsistencyLevel level : ConsistencyLevel.values()) + { + readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name())); + writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + level.name())); + } ++ + if (disableSerialReadLinearizability) + { + logger.warn("This node was started with -D{}. SERIAL (and LOCAL_SERIAL) reads coordinated by this node " + + "will not offer linearizability (see CASSANDRA-12126 for details on what this mean) with " + + "respect to other SERIAL operations. Please note that, with this flag, SERIAL reads will be " + + "slower than QUORUM reads, yet offer no more guarantee. This flag should only be used in " + + "the restricted case of upgrading from a pre-CASSANDRA-12126 version, and only if you " + + "understand the tradeoff.", DISABLE_SERIAL_READ_LINEARIZABILITY_KEY); + } } /** @@@ -223,31 -227,16 +238,17 @@@ CASRequest request, ConsistencyLevel consistencyForPaxos, ConsistencyLevel consistencyForCommit, - ClientState state) + ClientState state, + long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { - final long start = System.nanoTime(); + final long startTimeForMetrics = System.nanoTime(); - int contentions = 0; try { - consistencyForPaxos.validateForCas(); - consistencyForCommit.validateForCasCommit(keyspaceName); - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); - while (System.nanoTime() - queryStartNanoTime < timeout) + Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () -> { - // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; - - final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state); - final UUID ballot = pair.left; - contentions += pair.right; - // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); @@@ -279,55 -267,139 +279,151 @@@ // InvalidRequestException) any which aren't. updates = TriggerExecutor.instance.execute(updates); + return Pair.create(updates, null); + }; - Commit proposal = Commit.newProposal(ballot, updates); - Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime)) - { - commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); - Tracing.trace("CAS successful"); - return null; - } - - Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); - contentions++; - Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); - // continue to retry - } + return doPaxos(metadata, + key, + consistencyForPaxos, + consistencyForCommit, + consistencyForCommit, + state, - start, ++ queryStartNanoTime, + casWriteMetrics, + updateProposer); - throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName))); } - catch (WriteTimeoutException|ReadTimeoutException e) + catch (WriteTimeoutException | ReadTimeoutException e) { casWriteMetrics.timeouts.mark(); + writeMetricsMap.get(consistencyForPaxos).timeouts.mark(); throw e; } - catch (WriteFailureException|ReadFailureException e) + catch (WriteFailureException | ReadFailureException e) { casWriteMetrics.failures.mark(); + writeMetricsMap.get(consistencyForPaxos).failures.mark(); throw e; } - catch(UnavailableException e) + catch (UnavailableException e) { casWriteMetrics.unavailables.mark(); + writeMetricsMap.get(consistencyForPaxos).unavailables.mark(); throw e; } finally { - recordCasContention(contentions); - casWriteMetrics.addNano(System.nanoTime() - start); + final long latency = System.nanoTime() - startTimeForMetrics; + casWriteMetrics.addNano(latency); + writeMetricsMap.get(consistencyForPaxos).addNano(latency); } } - private static void recordCasContention(int contentions) ++ private static void recordCasContention(CASClientRequestMetrics casMetrics, int contentions) + { + if(contentions > 0) - casWriteMetrics.contention.update(contentions); ++ casMetrics.contention.update(contentions); ++ } ++ + /** + * Performs the Paxos rounds for a given proposal, retrying when preempted until the timeout. + * + * <p>The main 'configurable' of this method is the {@code createUpdateProposal} method: it is called by the method + * once a ballot has been successfully 'prepared' to generate the update to 'propose' (and commit if the proposal is + * successful). That method also generates the result that the whole method will return. Note that due to retrying, + * this method may be called multiple times and does not have to return the same results. + * + * @param metadata the table to update with Paxos. + * @param key the partition updated. + * @param consistencyForPaxos the serial consistency of the operation (either {@link ConsistencyLevel#SERIAL} or + * {@link ConsistencyLevel#LOCAL_SERIAL}). + * @param consistencyForReplayCommits the consistency for the commit phase of "replayed" in-progress operations. + * @param consistencyForCommit the consistency for the commit phase of _this_ operation update. + * @param state the client state. - * @param queryStartNanoTime the nano time for the start of the query this is part of. ++ * @param queryStartNanoTime the nano time for the start of the query this is part of. This is the base time for ++ * timeouts. + * @param casMetrics the metrics to update for this operation. + * @param createUpdateProposal method called after a successful 'prepare' phase to obtain 1) the actual update of + * this operation and 2) the result that the whole method should return. This can return {@code null} in the + * special where, after having "prepared" (and thus potentially replayed in-progress upgdates), we don't want + * to propose anything (the whole method then return {@code null}). + * @return the second element of the pair returned by {@code createUpdateProposal} (for the last call of that method + * if that method is called multiple times due to retries). + */ + private static RowIterator doPaxos(CFMetaData metadata, + DecoratedKey key, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForReplayCommits, + ConsistencyLevel consistencyForCommit, + ClientState state, + long queryStartNanoTime, + CASClientRequestMetrics casMetrics, + Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal) + throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException + { + int contentions = 0; + try + { + consistencyForPaxos.validateForCas(); + consistencyForReplayCommits.validateForCasCommit(metadata.ksName); + consistencyForCommit.validateForCasCommit(metadata.ksName); + + long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); + while (System.nanoTime() - queryStartNanoTime < timeout) + { + // for simplicity, we'll do a single liveness check at the start of each attempt + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos); + List<InetAddress> liveEndpoints = p.left; + int requiredParticipants = p.right; + + final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, + key, + metadata, + liveEndpoints, + requiredParticipants, + consistencyForPaxos, + consistencyForReplayCommits, + casMetrics, + state); + final UUID ballot = pair.left; + contentions += pair.right; + + Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get(); + // See method javadoc: null here is code for "stop here and return null". + if (proposalPair == null) + return null; + + Commit proposal = Commit.newProposal(ballot, proposalPair.left); + Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); - if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos)) ++ if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime)) + { + // We skip committing accepted updates when they are empty. This is an optimization which works + // because we also skip replaying those same empty update in beginAndRepairPaxos (see the longer + // comment there). As empty update are somewhat common (serial reads and non-applying CAS propose + // them), this is worth bothering. + if (!proposal.update.isEmpty()) - commitPaxos(proposal, consistencyForCommit, true); ++ commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime); + RowIterator result = proposalPair.right; + if (result != null) + Tracing.trace("CAS did not apply"); + else + Tracing.trace("CAS applied successfully"); + return result; + } + + Tracing.trace("Paxos proposal not accepted (pre-empted by a higher ballot)"); + contentions++; + Uninterruptibles.sleepUninterruptibly(ThreadLocalRandom.current().nextInt(100), TimeUnit.MILLISECONDS); + // continue to retry + } + + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); + } + finally + { + if(contentions > 0) + casMetrics.contention.update(contentions); + } } private static Predicate<InetAddress> sameDCPredicateFor(final String dc) @@@ -427,12 -515,9 +539,9 @@@ if (!inProgress.update.isEmpty() && inProgress.isAfter(mostRecent)) { Tracing.trace("Finishing incomplete paxos round {}", inProgress); - if(isWrite) - casWriteMetrics.unfinishedCommit.inc(); - else - casReadMetrics.unfinishedCommit.inc(); + casMetrics.unfinishedCommit.inc(); Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update); - if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos)) + if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime)) { try { @@@ -440,7 -525,6 +549,7 @@@ } catch (WriteTimeoutException e) { - recordCasContention(contentions); ++ recordCasContention(casMetrics, contentions); // We're still doing preparation for the paxos rounds, so we want to use the CAS (see CASSANDRA-8672) throw new WriteTimeoutException(WriteType.CAS, e.consistency, e.received, e.blockFor); } @@@ -475,7 -559,6 +584,7 @@@ return Pair.create(ballot, contentions); } - recordCasContention(contentions); ++ recordCasContention(casMetrics, contentions); throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(metadata.ksName))); } @@@ -1611,21 -1642,31 +1720,31 @@@ PartitionIterator result = null; try { - // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel); - List<InetAddress> liveEndpoints = p.left; - int requiredParticipants = p.right; - - // does the work of applying in-progress writes; throws UAE or timeout if it can't - final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL - ? ConsistencyLevel.LOCAL_QUORUM - : ConsistencyLevel.QUORUM; - final ConsistencyLevel consistencyForReplayCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL - ? ConsistencyLevel.LOCAL_QUORUM - : ConsistencyLevel.QUORUM; ++ final ConsistencyLevel consistencyForReplayCommitsOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL ++ ? ConsistencyLevel.LOCAL_QUORUM ++ : ConsistencyLevel.QUORUM; try { - final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state); - if (pair.right > 0) - casReadMetrics.contention.update(pair.right); + // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_ + // that no other in-progress can get resurrected. + Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = + disableSerialReadLinearizability + ? () -> null + : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null); + // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at + // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even + // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit + // consistency is irrelevant (we use ANY just to emphasis that we don't wait on our commit). + doPaxos(metadata, + key, + consistencyLevel, - consistencyForReplayCommitOrFetch, ++ consistencyForReplayCommitsOrFetch, + ConsistencyLevel.ANY, + state, + start, + casReadMetrics, + updateProposer); } catch (WriteTimeoutException e) { @@@ -1633,10 -1674,10 +1752,10 @@@ } catch (WriteFailureException e) { - throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false); + throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint); } - result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime); - result = fetchRows(group.commands, consistencyForReplayCommitOrFetch); ++ result = fetchRows(group.commands, consistencyForReplayCommitsOrFetch, queryStartNanoTime); } catch (UnavailableException e) { diff --cc test/distributed/org/apache/cassandra/distributed/test/CASTest.java index 0000000,0b1dce6..473f56c mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java @@@ -1,0 -1,679 +1,684 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.io.IOException; + import java.util.UUID; + import java.util.function.BiConsumer; + + import org.junit.Assert; + import org.junit.Ignore; + import org.junit.Test; + + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.db.marshal.Int32Type; + import org.apache.cassandra.dht.Murmur3Partitioner; + import org.apache.cassandra.dht.Token; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.ICoordinator; + import org.apache.cassandra.distributed.api.IInstance; + import org.apache.cassandra.distributed.api.IMessageFilters; + import org.apache.cassandra.distributed.impl.Instance; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.utils.UUIDGen; + + import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + import static org.apache.cassandra.distributed.shared.AssertUtils.fail; + import static org.apache.cassandra.distributed.shared.AssertUtils.row; + import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_COMMIT; + import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PREPARE; + import static org.apache.cassandra.net.MessagingService.Verb.PAXOS_PROPOSE; + import static org.apache.cassandra.net.MessagingService.Verb.READ; + import static org.junit.Assert.assertFalse; + import static org.junit.Assert.assertTrue; + + public class CASTest extends TestBaseImpl + { + @Test + public void simpleUpdate() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 1)); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 3 WHERE pk = 1 and ck = 1 IF v = 2", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 1)); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + @Test + public void incompletePrepare() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop = cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop.off(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL)); + } + } + + @Test + public void incompletePropose() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop1.off(); + // make sure we encounter one of the in-progress proposals so we complete it + cluster.filters().verbs(PAXOS_PREPARE.ordinal()).from(1).to(2).drop(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + @Test + public void incompleteCommit() throws Throwable + { + try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L).set("cas_contention_timeout_in_ms", 200L)))) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + IMessageFilters.Filter drop1 = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + try + { + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); + Assert.fail(); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + drop1.off(); + // make sure we see one of the successful commits + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(2).drop(); + cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 2 WHERE pk = 1 and ck = 1 IF v = 1", ConsistencyLevel.QUORUM); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.SERIAL), + row(1, 1, 2)); + } + } + + private int[] paxosAndReadVerbs() { + return new int[] { + MessagingService.Verb.PAXOS_PREPARE.ordinal(), + MessagingService.Verb.PAXOS_PROPOSE.ordinal(), + MessagingService.Verb.PAXOS_COMMIT.ordinal(), + MessagingService.Verb.READ.ordinal() + }; + } + + /** + * Base test to ensure that if a write times out but with a proposal accepted by some nodes (less then quorum), and + * a following SERIAL operation does not observe that write (the node having accepted it do not participate in that + * following operation), then that write is never applied, even when the nodes having accepted the original proposal + * participate. + * + * <p>In other words, if an operation timeout, it may or may not be applied, but that "fate" is persistently decided + * by the very SERIAL operation that "succeed" (in the sense of 'not timing out or throwing some other exception'). + * + * @param postTimeoutOperation1 a SERIAL operation executed after an initial write that inserts the row [0, 0] times + * out. It is executed with a QUORUM of nodes that have _not_ see the timed out + * proposal, and so that operation should expect that the [0, 0] write has not taken + * place. + * @param postTimeoutOperation2 a 2nd SERIAL operation executed _after_ {@code postTimeoutOperation1}, with no + * write executed between the 2 operation. Contrarily to the 1st operation, the QORUM + * for this operation _will_ include the node that got the proposal for the [0, 0] + * insert but didn't participated to {@code postTimeoutOperation1}}. That operation + * should also no witness that [0, 0] write (since {@code postTimeoutOperation1} + * didn't). + * @param loseCommitOfOperation1 if {@code true}, the test will also drop the "commits" messages for + * {@code postTimeoutOperation1}. In general, the test should behave the same with or + * without that flag since a value is decided as soon as it has been "accepted by + * quorum" and the commits should always be properly replayed. + */ + private void consistencyAfterWriteTimeoutTest(BiConsumer<String, ICoordinator> postTimeoutOperation1, + BiConsumer<String, ICoordinator> postTimeoutOperation2, + boolean loseCommitOfOperation1) throws IOException + { - try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 200L) ++ // It's unclear why (haven't dug), but in some of the instance of this test method, there is a consistent 2+ ++ // seconds pauses between the prepare and propose phases during the execution of 'postTimeoutOperation2'. This ++ // does not happen on 3.0 and there is no report of such long pauses otherwise, so an hypothesis is that this ++ // is due to the in-jvm dtest framework. This is is why we use a 4 seconds timeout here. Given this test is ++ // not about performance, this is probably ok, even if we ideally should dug into the underlying reason. ++ try (Cluster cluster = init(Cluster.create(3, config -> config.set("write_request_timeout_in_ms", 4000L) + .set("cas_contention_timeout_in_ms", 200L)))) + { + String table = KEYSPACE + ".t"; + cluster.schemaChange("CREATE TABLE " + table + " (k int PRIMARY KEY, v int)"); + + // We do a CAS insertion, but have with the PROPOSE message dropped on node 1 and 2. The CAS will not get + // through and should timeout. Importantly, node 3 does receive and answer the PROPOSE. + IMessageFilters.Filter dropProposeFilter = cluster.filters() + .inbound() + .verbs(MessagingService.Verb.PAXOS_PROPOSE.ordinal()) + .to(1, 2) + .drop(); + try + { + // NOTE: the consistency below is the "commit" one, so it doesn't matter at all here. + cluster.coordinator(1) + .execute("INSERT INTO " + table + "(k, v) VALUES (0, 0) IF NOT EXISTS", ConsistencyLevel.ONE); + fail("The insertion should have timed-out"); + } + catch (Exception e) + { + // We expect a write timeout. If we get one, the test can continue, otherwise, we rethrow. Note that we + // look at the root cause because the dtest framework effectively wrap the exception in a RuntimeException + // (we could just look at the immediate cause, but this feel a bit more resilient this way). + // TODO: we can't use an instanceof below because the WriteTimeoutException we get is from a different class + // loader than the one the test run under, and that's our poor-man work-around. This kind of things should + // be improved at the dtest API level. + if (!e.getCause().getClass().getSimpleName().equals("WriteTimeoutException")) + throw e; + } + finally + { + dropProposeFilter.off(); + } + + // Isolates node 3 and executes the SERIAL operation. As neither node 1 or 2 got the initial insert proposal, + // there is nothing to "replay" and the operation should assert the table is still empty. + IMessageFilters.Filter ignoreNode3Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(3).drop(); + IMessageFilters.Filter dropCommitFilter = null; + if (loseCommitOfOperation1) + { + dropCommitFilter = cluster.filters().verbs(PAXOS_COMMIT.ordinal()).to(1, 2).drop(); + } + try + { + postTimeoutOperation1.accept(table, cluster.coordinator(1)); + } + finally + { + ignoreNode3Filter.off(); + if (dropCommitFilter != null) + dropCommitFilter.off(); + } + + // Node 3 is now back and we isolate node 2 to ensure the next read hits node 1 and 3. + // What we want to ensure is that despite node 3 having the initial insert in its paxos state in a position of + // being replayed, that insert is _not_ replayed (it would contradict serializability since the previous + // operation asserted nothing was inserted). It is this execution that failed before CASSANDRA-12126. + IMessageFilters.Filter ignoreNode2Filter = cluster.filters().verbs(paxosAndReadVerbs()).to(2).drop(); + try + { + postTimeoutOperation2.accept(table, cluster.coordinator(1)); + } + finally + { + ignoreNode2Filter.off(); + } + } + } + + /** + * Tests that if a write timeouts and a following serial read does not see that write, then no following reads sees + * it, even if some nodes still have the write in their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void readConsistencyAfterWriteTimeoutTest() throws IOException + { + BiConsumer<String, ICoordinator> operation = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + + consistencyAfterWriteTimeoutTest(operation, operation, false); + consistencyAfterWriteTimeoutTest(operation, operation, true); + } + + /** + * Tests that if a write timeouts, then a following CAS succeed but does not apply in a way that indicate the write + * has not applied, then no following CAS can see that initial insert , even if some nodes still have the write in + * their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void nonApplyingCasConsistencyAfterWriteTimeout() throws IOException + { + // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits. + // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test. + BiConsumer<String, ICoordinator> operation = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.ANY)); + consistencyAfterWriteTimeoutTest(operation, operation, false); + consistencyAfterWriteTimeoutTest(operation, operation, true); + } + + /** + * Tests that if a write timeouts and a following serial read does not see that write, then no following CAS see + * that initial insert, even if some nodes still have the write in their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void mixedReadAndNonApplyingCasConsistencyAfterWriteTimeout() throws IOException + { + BiConsumer<String, ICoordinator> operation1 = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + BiConsumer<String, ICoordinator> operation2 = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.QUORUM)); + consistencyAfterWriteTimeoutTest(operation1, operation2, false); + consistencyAfterWriteTimeoutTest(operation1, operation2, true); + } + + /** + * Tests that if a write timeouts and a following CAS succeed but does not apply in a way that indicate the write + * has not applied, then following serial reads do no see that write, even if some nodes still have the write in + * their paxos state. + * + * <p>This specifically test for the inconsistency described/fixed by CASSANDRA-12126. + */ + @Test + public void mixedNonApplyingCasAndReadConsistencyAfterWriteTimeout() throws IOException + { + // Note: we use CL.ANY so that the operation don't timeout in the case where we "lost" the operation1 commits. + // The commit CL shouldn't have impact on this test anyway, so this doesn't diminishes the test. + BiConsumer<String, ICoordinator> operation1 = + (table, coordinator) -> assertCasNotApplied(coordinator.execute("UPDATE " + table + " SET v = 1 WHERE k = 0 IF v = 0", + ConsistencyLevel.ANY)); + BiConsumer<String, ICoordinator> operation2 = + (table, coordinator) -> assertRows(coordinator.execute("SELECT * FROM " + table + " WHERE k=0", + ConsistencyLevel.SERIAL)); + consistencyAfterWriteTimeoutTest(operation1, operation2, false); + consistencyAfterWriteTimeoutTest(operation1, operation2, true); + } + + // TODO: this shoud probably be moved into the dtest API. + private void assertCasNotApplied(Object[][] resultSet) + { + assertFalse("Expected a CAS resultSet (with at least application result) but got an empty one.", + resultSet.length == 0); + assertFalse("Invalid empty first row in CAS resultSet.", resultSet[0].length == 0); + Object wasApplied = resultSet[0][0]; + assertTrue("Expected 1st column of CAS resultSet to be a boolean, but got a " + wasApplied.getClass(), + wasApplied instanceof Boolean); + assertFalse("Expected CAS to not be applied, but was applied.", (Boolean)wasApplied); + } + + /** + * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful + * conflicting with another successful write performed by a node that did witness the range movement + * Prepare, Propose and Commit A to {1, 2} + * Range moves to {2, 3, 4} + * Prepare and Propose B (=> !A) to {3, 4} + */ + @Ignore + @Test + public void testSuccessfulWriteBeforeRangeMovement() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {1} is unaware (yet) that {4} is an owner of the token + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2,3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + for (int i = 1 ; i <= 3 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {4} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop(); + assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + } + } + + /** + * Failed write (by node that did not yet witness a range movement via gossip) is witnessed later as successful + * conflicting with another successful write performed by a node that did witness the range movement + * - Range moves from {1, 2, 3} to {2, 3, 4}, witnessed by X (not by !X) + * - X: Prepare, Propose and Commit A to {3, 4} + * - !X: Prepare and Propose B (=>!A) to {1, 2} + */ + @Ignore + @Test + public void testConflictingWritesWithStaleRingInformation() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {1} is unaware (yet) that {4} is an owner of the token + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + + // {4} promises, accepts and commits on !{2} => {3, 4} + int pk = pk(cluster, 1, 2); + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(2).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(4).to(2).drop(); + assertRows(cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // {1} promises, accepts and commmits on !{3} => {1, 2} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + } + } + + /** + * Successful write during range movement, not witnessed by read after range movement. + * Very similar to {@link #testConflictingWritesWithStaleRingInformation}. + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - !X: Prepare and Propose to {1, 2} + * - Range movement witnessed by !X + * - Any: Prepare and Read from {3, 4} + */ + @Ignore + @Test + public void testSucccessfulWriteDuringRangeMovementFollowedByRead() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commmits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk), + row(pk, 1, 1)); + } + } + + /** + * Successful write during range movement not witnessed by write after range movement + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - !X: Prepare and Propose to {1, 2} + * - Range movement witnessed by !X + * - Any: Prepare and Propose to {3, 4} + */ + @Ignore + @Test + public void testSuccessfulWriteDuringRangeMovementFollowedByConflicting() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, pk, 1, 1, null)); + + // TODO: repair and verify base table state + } + } + + /** + * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation + * being performed with stale ring information. + * This is a particular special case of stale ring information sequencing, which probably would be resolved + * by fixing each of the more isolated cases (but is unique, so deserving of its own test case). + * See CASSANDRA-15745 + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - X: Prepare to {2, 3, 4} + * - X: Propose to {4} + * - !X: Prepare and Propose to {1, 2} + * - Range move visible by !X + * - Any: Prepare and Read from {3, 4} + */ + @Ignore + @Test + public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByRead() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop(); + try + { + cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk); + Assert.assertTrue(false); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", ConsistencyLevel.SERIAL, pk), + row(pk, 1, null, 2)); + } + } + + /** + * During a range movement, a CAS may fail leaving side effects that are not witnessed by another operation + * being performed with stale ring information. + * This is a particular special case of stale ring information sequencing, which probably would be resolved + * by fixing each of the more isolated cases (but is unique, so deserving of its own test case). + * See CASSANDRA-15745 + * + * - Range moves from {1, 2, 3} to {2, 3, 4}; witnessed by X (not by !X) + * - X: Prepare to {2, 3, 4} + * - X: Propose to {4} + * - !X: Prepare and Propose to {1, 2} + * - Range move visible by !X + * - Any: Prepare and Propose to {3, 4} + */ + @Ignore + @Test + public void testIncompleteWriteFollowedBySuccessfulWriteWithStaleRingDuringRangeMovementFollowedByWrite() throws Throwable + { + try (Cluster cluster = Cluster.create(4, config -> config + .set("write_request_timeout_in_ms", 200L) + .set("cas_contention_timeout_in_ms", 200L))) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + // make it so {4} is bootstrapping, and this has not propagated to other nodes yet + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(1).acceptsOnInstance(Instance::removeFromRing).accept(cluster.get(4)); + cluster.get(4).acceptsOnInstance(Instance::addToRingBootstrapping).accept(cluster.get(4)); + + int pk = pk(cluster, 1, 2); + + // {4} promises and accepts on !{1} => {2, 3, 4}; commits on !{1, 2, 3} => {4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(4).to(1).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(4).to(1, 2, 3).drop(); + try + { + cluster.coordinator(4).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (?, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM, pk); + Assert.assertTrue(false); + } + catch (RuntimeException wrapped) + { + Assert.assertEquals("Operation timed out - received only 1 responses.", wrapped.getCause().getMessage()); + } + + // {1} promises and accepts on !{3} => {1, 2}; commits on !{2, 3} => {1} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(1).to(3).drop(); + cluster.filters().verbs(PAXOS_COMMIT.ordinal()).from(1).to(2, 3).drop(); + assertRows(cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(true)); + + // finish topology change + for (int i = 1 ; i <= 4 ; ++i) + cluster.get(i).acceptsOnInstance(Instance::addToRingNormal).accept(cluster.get(4)); + + // {3} reads from !{2} => {3, 4} + cluster.filters().verbs(PAXOS_PREPARE.ordinal(), READ.ordinal()).from(3).to(2).drop(); + cluster.filters().verbs(PAXOS_PROPOSE.ordinal()).from(3).to(2).drop(); + assertRows(cluster.coordinator(3).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v2) VALUES (?, 1, 2) IF NOT EXISTS", ConsistencyLevel.ONE, pk), + row(false, 5, 1, null, 2)); + } + } + + private static int pk(Cluster cluster, int lb, int ub) + { + return pk(cluster.get(lb), cluster.get(ub)); + } + + private static int pk(IInstance lb, IInstance ub) + { + return pk(Murmur3Partitioner.instance.getTokenFactory().fromString(lb.config().getString("initial_token")), + Murmur3Partitioner.instance.getTokenFactory().fromString(ub.config().getString("initial_token"))); + } + + private static int pk(Token lb, Token ub) + { + int pk = 0; + Token pkt; + while (lb.compareTo(pkt = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(pk))) >= 0 || ub.compareTo(pkt) < 0) + ++pk; + return pk; + } + + private static void debugOwnership(Cluster cluster, int pk) + { + for (int i = 1 ; i <= cluster.size() ; ++i) + System.out.println(i + ": " + cluster.get(i).appliesOnInstance((Integer v) -> StorageService.instance.getNaturalAndPendingEndpoints(KEYSPACE, Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(v)))) + .apply(pk)); + } + + private static void debugPaxosState(Cluster cluster, int pk) + { + UUID cfid = cluster.get(1).callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metadata.cfId); + for (int i = 1 ; i <= cluster.size() ; ++i) + for (Object[] row : cluster.get(i).executeInternal("select in_progress_ballot, proposal_ballot, most_recent_commit_at from system.paxos where row_key = ? and cf_id = ?", Int32Type.instance.decompose(pk), cfid)) + System.out.println(i + ": " + (row[0] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[0])) + ", " + (row[1] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[1])) + ", " + (row[2] == null ? 0L : UUIDGen.microsTimestamp((UUID)row[2]))); + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org