This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8960b4d8513e22c4c11c181c520380342935aee7 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Tue Nov 30 17:00:45 2021 +0000 [CASSANDRA-17177] List Appends Are Not Linearizable --- .../cassandra/cql3/statements/CQL3CasRequest.java | 38 +++++++++++++++------- .../cql3/statements/ModificationStatement.java | 2 +- .../org/apache/cassandra/service/CASRequest.java | 3 +- .../org/apache/cassandra/service/StorageProxy.java | 16 ++++----- 4 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 2a87592..376a65f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -232,11 +232,12 @@ public class CQL3CasRequest implements CASRequest return builder.build(); } - public PartitionUpdate makeUpdates(FilteredPartition current, ClientState state) throws InvalidRequestException + public PartitionUpdate makeUpdates(FilteredPartition current, ClientState state, TimeUUID ballot) throws InvalidRequestException { PartitionUpdate.Builder updateBuilder = new PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size()); + long timeUuidNanos = 0; for (RowUpdate upd : updates) - upd.applyUpdates(current, updateBuilder, state); + timeUuidNanos = upd.applyUpdates(current, updateBuilder, state, ballot.msb(), timeUuidNanos); for (RangeDeletion upd : rangeDeletions) upd.applyUpdates(current, updateBuilder, state); @@ -246,6 +247,24 @@ public class CQL3CasRequest implements CASRequest return partitionUpdate; } + private static class CASUpdateParameters extends UpdateParameters + { + final long timeUuidMsb; + long timeUuidNanos; + + public CASUpdateParameters(TableMetadata metadata, RegularAndStaticColumns updatedColumns, ClientState state, QueryOptions options, long timestamp, int nowInSec, int ttl, Map<DecoratedKey, Partition> prefetchedRows, long timeUuidMsb, long timeUuidNanos) throws InvalidRequestException + { + super(metadata, updatedColumns, state, options, timestamp, nowInSec, ttl, prefetchedRows); + this.timeUuidMsb = timeUuidMsb; + this.timeUuidNanos = timeUuidNanos; + } + + public byte[] nextTimeUUIDAsBytes() + { + return TimeUUID.toBytes(timeUuidMsb, TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++)); + } + } + /** * Due to some operation on lists, we can't generate the update that a given Modification statement does before * we get the values read by the initial read of Paxos. A RowUpdate thus just store the relevant information @@ -269,19 +288,14 @@ public class CQL3CasRequest implements CASRequest this.nowInSeconds = nowInSeconds; } - void applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder, ClientState state) + long applyUpdates(FilteredPartition current, PartitionUpdate.Builder updateBuilder, ClientState state, long timeUuidMsb, long timeUuidNanos) { Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.singletonMap(key, current) : null; - UpdateParameters params = - new UpdateParameters(metadata, - updateBuilder.columns(), - state, - options, - timestamp, - nowInSeconds, - stmt.getTimeToLive(options), - map); + CASUpdateParameters params = + new CASUpdateParameters(metadata, updateBuilder.columns(), state, options, timestamp, nowInSeconds, + stmt.getTimeToLive(options), map, timeUuidMsb, timeUuidNanos); stmt.addUpdateForKey(updateBuilder, clustering, params); + return params.timeUuidNanos; } } diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 42b4d45..fd2f044 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -673,7 +673,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa if (!request.appliesTo(current)) return current.rowIterator(); - PartitionUpdate updates = request.makeUpdates(current, state); + PartitionUpdate updates = request.makeUpdates(current, state, ballot); updates = TriggerExecutor.instance.execute(updates); Commit proposal = Commit.newProposal(ballot, updates); diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java index 77d1f5d..9197ef0 100644 --- a/src/java/org/apache/cassandra/service/CASRequest.java +++ b/src/java/org/apache/cassandra/service/CASRequest.java @@ -21,6 +21,7 @@ import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.utils.TimeUUID; /** * Abstract the conditions and updates for a CAS operation. @@ -42,5 +43,5 @@ public interface CASRequest * The updates to perform of a CAS success. The values fetched using the readFilter() * are passed as argument. */ - public PartitionUpdate makeUpdates(FilteredPartition current, ClientState state) throws InvalidRequestException; + public PartitionUpdate makeUpdates(FilteredPartition current, ClientState state, TimeUUID ballot) throws InvalidRequestException; } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 10785a9..267436f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -136,7 +137,6 @@ import org.apache.cassandra.utils.MBeanWrapper; import org.apache.cassandra.utils.MonotonicClock; import org.apache.cassandra.utils.NoSpamLogger; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static com.google.common.collect.Iterables.concat; @@ -296,7 +296,7 @@ public class StorageProxy implements StorageProxyMBean key.toString(), keyspaceName, cfName)); } - Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () -> + Function<TimeUUID, Pair<PartitionUpdate, RowIterator>> updateProposer = ballot -> { // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); @@ -317,7 +317,7 @@ public class StorageProxy implements StorageProxyMBean } // Create the desired updates - PartitionUpdate updates = request.makeUpdates(current, state); + PartitionUpdate updates = request.makeUpdates(current, state, ballot); long size = updates.dataSize(); casWriteMetrics.mutationSize.update(size); @@ -435,7 +435,7 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistencyForCommit, long queryStartNanoTime, CASClientRequestMetrics casMetrics, - Supplier<Pair<PartitionUpdate, RowIterator>> createUpdateProposal) + Function<TimeUUID, Pair<PartitionUpdate, RowIterator>> createUpdateProposal) throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException { int contentions = 0; @@ -464,7 +464,7 @@ public class StorageProxy implements StorageProxyMBean final TimeUUID ballot = pair.ballot; contentions += pair.contentions; - Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.get(); + Pair<PartitionUpdate, RowIterator> proposalPair = createUpdateProposal.apply(ballot); // See method javadoc: null here is code for "stop here and return null". if (proposalPair == null) return null; @@ -1833,10 +1833,10 @@ public class StorageProxy implements StorageProxyMBean { // Commit an empty update to make sure all in-progress updates that should be finished first is, _and_ // that no other in-progress can get resurrected. - Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = + Function<TimeUUID, Pair<PartitionUpdate, RowIterator>> updateProposer = Paxos.getPaxosVariant() == Config.PaxosVariant.v1_without_linearizable_reads - ? () -> null - : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null); + ? ballot -> null + : ballot -> Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null); // When replaying, we commit at quorum/local quorum, as we want to be sure the following read (done at // quorum/local_quorum) sees any replayed updates. Our own update is however empty, and those don't even // get committed due to an optimiation described in doPaxos/beingRepairAndPaxos, so the commit --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
