This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8960b4d8513e22c4c11c181c520380342935aee7
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 30 17:00:45 2021 +0000

    [CASSANDRA-17177] List Appends Are Not Linearizable
---
 .../cassandra/cql3/statements/CQL3CasRequest.java  | 38 +++++++++++++++-------
 .../cql3/statements/ModificationStatement.java     |  2 +-
 .../org/apache/cassandra/service/CASRequest.java   |  3 +-
 .../org/apache/cassandra/service/StorageProxy.java | 16 ++++-----
 4 files changed, 37 insertions(+), 22 deletions(-)

diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 2a87592..376a65f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -232,11 +232,12 @@ public class CQL3CasRequest implements CASRequest
         return builder.build();
     }
 
-    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state) throws InvalidRequestException
+    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state, TimeUUID ballot) throws InvalidRequestException
     {
         PartitionUpdate.Builder updateBuilder = new 
PartitionUpdate.Builder(metadata, key, updatedColumns(), conditions.size());
+        long timeUuidNanos = 0;
         for (RowUpdate upd : updates)
-            upd.applyUpdates(current, updateBuilder, state);
+            timeUuidNanos = upd.applyUpdates(current, updateBuilder, state, 
ballot.msb(), timeUuidNanos);
         for (RangeDeletion upd : rangeDeletions)
             upd.applyUpdates(current, updateBuilder, state);
 
@@ -246,6 +247,24 @@ public class CQL3CasRequest implements CASRequest
         return partitionUpdate;
     }
 
+    private static class CASUpdateParameters extends UpdateParameters
+    {
+        final long timeUuidMsb;
+        long timeUuidNanos;
+
+        public CASUpdateParameters(TableMetadata metadata, 
RegularAndStaticColumns updatedColumns, ClientState state, QueryOptions 
options, long timestamp, int nowInSec, int ttl, Map<DecoratedKey, Partition> 
prefetchedRows, long timeUuidMsb, long timeUuidNanos) throws 
InvalidRequestException
+        {
+            super(metadata, updatedColumns, state, options, timestamp, 
nowInSec, ttl, prefetchedRows);
+            this.timeUuidMsb = timeUuidMsb;
+            this.timeUuidNanos = timeUuidNanos;
+        }
+
+        public byte[] nextTimeUUIDAsBytes()
+        {
+            return TimeUUID.toBytes(timeUuidMsb, 
TimeUUIDType.signedBytesToNativeLong(timeUuidNanos++));
+        }
+    }
+
     /**
      * Due to some operation on lists, we can't generate the update that a 
given Modification statement does before
      * we get the values read by the initial read of Paxos. A RowUpdate thus 
just store the relevant information
@@ -269,19 +288,14 @@ public class CQL3CasRequest implements CASRequest
             this.nowInSeconds = nowInSeconds;
         }
 
-        void applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder, ClientState state)
+        long applyUpdates(FilteredPartition current, PartitionUpdate.Builder 
updateBuilder, ClientState state, long timeUuidMsb, long timeUuidNanos)
         {
             Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.singletonMap(key, current) : null;
-            UpdateParameters params =
-                new UpdateParameters(metadata,
-                                     updateBuilder.columns(),
-                                     state,
-                                     options,
-                                     timestamp,
-                                     nowInSeconds,
-                                     stmt.getTimeToLive(options),
-                                     map);
+            CASUpdateParameters params =
+                new CASUpdateParameters(metadata, updateBuilder.columns(), 
state, options, timestamp, nowInSeconds,
+                                     stmt.getTimeToLive(options), map, 
timeUuidMsb, timeUuidNanos);
             stmt.addUpdateForKey(updateBuilder, clustering, params);
+            return params.timeUuidNanos;
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 42b4d45..fd2f044 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -673,7 +673,7 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         if (!request.appliesTo(current))
             return current.rowIterator();
 
-        PartitionUpdate updates = request.makeUpdates(current, state);
+        PartitionUpdate updates = request.makeUpdates(current, state, ballot);
         updates = TriggerExecutor.instance.execute(updates);
 
         Commit proposal = Commit.newProposal(ballot, updates);
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java 
b/src/java/org/apache/cassandra/service/CASRequest.java
index 77d1f5d..9197ef0 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -21,6 +21,7 @@ import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.utils.TimeUUID;
 
 /**
  * Abstract the conditions and updates for a CAS operation.
@@ -42,5 +43,5 @@ public interface CASRequest
      * The updates to perform of a CAS success. The values fetched using the 
readFilter()
      * are passed as argument.
      */
-    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state) throws InvalidRequestException;
+    public PartitionUpdate makeUpdates(FilteredPartition current, ClientState 
state, TimeUUID ballot) throws InvalidRequestException;
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 10785a9..267436f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -136,7 +137,6 @@ import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static com.google.common.collect.Iterables.concat;
@@ -296,7 +296,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                 
key.toString(), keyspaceName, cfName));
             }
 
-            Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer = () ->
+            Function<TimeUUID, Pair<PartitionUpdate, RowIterator>> 
updateProposer = ballot ->
             {
                 // read the current values and check they validate the 
conditions
                 Tracing.trace("Reading existing values for CAS precondition");
@@ -317,7 +317,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 // Create the desired updates
-                PartitionUpdate updates = request.makeUpdates(current, state);
+                PartitionUpdate updates = request.makeUpdates(current, state, 
ballot);
 
                 long size = updates.dataSize();
                 casWriteMetrics.mutationSize.update(size);
@@ -435,7 +435,7 @@ public class StorageProxy implements StorageProxyMBean
                                        ConsistencyLevel consistencyForCommit,
                                        long queryStartNanoTime,
                                        CASClientRequestMetrics casMetrics,
-                                       Supplier<Pair<PartitionUpdate, 
RowIterator>> createUpdateProposal)
+                                       Function<TimeUUID, 
Pair<PartitionUpdate, RowIterator>> createUpdateProposal)
     throws UnavailableException, IsBootstrappingException, 
RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
         int contentions = 0;
@@ -464,7 +464,7 @@ public class StorageProxy implements StorageProxyMBean
                 final TimeUUID ballot = pair.ballot;
                 contentions += pair.contentions;
 
-                Pair<PartitionUpdate, RowIterator> proposalPair = 
createUpdateProposal.get();
+                Pair<PartitionUpdate, RowIterator> proposalPair = 
createUpdateProposal.apply(ballot);
                 // See method javadoc: null here is code for "stop here and 
return null".
                 if (proposalPair == null)
                     return null;
@@ -1833,10 +1833,10 @@ public class StorageProxy implements StorageProxyMBean
             {
                 // Commit an empty update to make sure all in-progress updates 
that should be finished first is, _and_
                 // that no other in-progress can get resurrected.
-                Supplier<Pair<PartitionUpdate, RowIterator>> updateProposer =
+                Function<TimeUUID, Pair<PartitionUpdate, RowIterator>> 
updateProposer =
                     Paxos.getPaxosVariant() == 
Config.PaxosVariant.v1_without_linearizable_reads
-                    ? () -> null
-                    : () -> Pair.create(PartitionUpdate.emptyUpdate(metadata, 
key), null);
+                    ? ballot -> null
+                    : ballot -> 
Pair.create(PartitionUpdate.emptyUpdate(metadata, key), null);
                 // When replaying, we commit at quorum/local quorum, as we 
want to be sure the following read (done at
                 // quorum/local_quorum) sees any replayed updates. Our own 
update is however empty, and those don't even
                 // get committed due to an optimiation described in 
doPaxos/beingRepairAndPaxos, so the commit

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to