This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9327165d407dd4ea4c9af5b568ecd2a95474fa30
Author: David Capwell <[email protected]>
AuthorDate: Fri Oct 21 16:29:28 2022 -0700

    Command table now uses a local version added to the payload rather than 
rely on a version column, and fixed a few feedback related issues
    
    patch by David Capwell; reviewed by Blake Eggleston for CASSANDRA-17103
---
 .build/include-accord.sh                           |   4 +-
 .../db/filter/AbstractClusteringIndexFilter.java   |   1 -
 .../db/filter/ClusteringIndexSliceFilter.java      |   1 -
 .../apache/cassandra/db/partitions/Partition.java  |   2 -
 .../cassandra/io/LocalVersionedSerializer.java     |  94 +++++++++++++
 .../cassandra/io/MessageVersionProvider.java       |  24 ++++
 .../apache/cassandra/net/ResponseVerbHandler.java  |   2 +-
 .../cassandra/service/accord/AccordCommand.java    |   1 +
 .../service/accord/AccordCommandStore.java         |  11 +-
 .../service/accord/AccordCommandsForKey.java       |   2 +-
 .../cassandra/service/accord/AccordKeyspace.java   | 145 ++++++++-------------
 .../service/accord/AccordMessageSink.java          |   3 +-
 .../service/accord/AccordPartialCommand.java       |  85 +++++-------
 .../service/accord/AccordSerializerVersion.java    | 114 ++++++++++++++++
 .../cassandra/service/accord/AccordStateCache.java |  24 ++--
 .../service/accord/async/AsyncLoader.java          |   6 +-
 .../service/accord/async/AsyncOperation.java       |  26 ++--
 .../service/accord/async/AsyncWriter.java          |  14 +-
 .../accord/serializers/AcceptSerializers.java      |   6 +-
 .../cassandra/simulator/asm/InterceptClasses.java  |   3 +-
 .../cassandra/simulator/SimulationRunner.java      |   1 +
 .../apache/cassandra/simulator/SimulatorUtils.java |   7 +-
 .../service/accord/async/AsyncLoaderTest.java      |  19 +--
 .../service/accord/async/AsyncWriterTest.java      |   3 +-
 24 files changed, 382 insertions(+), 216 deletions(-)

diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 7069e22b6f..0dc01bf2ba 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -24,8 +24,8 @@ set -o nounset
 
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
-accord_repo='https://github.com/bdeggleston/cassandra-accord.git'
-accord_branch='metadata-persistence'
+accord_repo='https://github.com/apache/cassandra-accord.git'
+accord_branch='trunk'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git 
a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java 
b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index e1792d17f8..ddcaaed812 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.filter;
 
 import java.io.IOException;
-import java.util.Objects;
 
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
diff --git 
a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java 
b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 1f4b72c810..178c96b453 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.db.filter;
 import java.io.IOException;
 import java.util.List;
 import java.nio.ByteBuffer;
-import java.util.Objects;
 
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java 
b/src/java/org/apache/cassandra/db/partitions/Partition.java
index 1daf20cd00..601934a8e7 100644
--- a/src/java/org/apache/cassandra/db/partitions/Partition.java
+++ b/src/java/org/apache/cassandra/db/partitions/Partition.java
@@ -21,12 +21,10 @@ import java.util.NavigableSet;
 
 import javax.annotation.Nullable;
 
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.service.accord.api.AccordKey;
 
 /**
  * In-memory representation of a Partition.
diff --git a/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java 
b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java
new file mode 100644
index 0000000000..739f007846
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/LocalVersionedSerializer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Serializer that stores the version within the buffer.  Normal usage of 
{@link IVersionedSerializer} is to rely on
+ * {@link org.apache.cassandra.net.MessagingService#current_version} and 
messaging version numbers, but that implies
+ * that a messaging version bump is required if a change is made to this 
field; for some cases the serializer isn't
+ * dealing with messages and instead are blobs stored in a table, for these 
cases it may be better to rely on a field
+ * specific versioning that gets stored along the data.
+ */
+public class LocalVersionedSerializer<I>
+{
+    private final MessageVersionProvider currentVersion;
+    private final IVersionedSerializer<MessageVersionProvider> 
versionSerializer;
+    private final IVersionedSerializer<I> serializer;
+
+    public <V extends MessageVersionProvider> LocalVersionedSerializer(V 
currentVersion,
+                                                                       
IVersionedSerializer<V> versionSerializer,
+                                                                       
IVersionedSerializer<I> serializer)
+    {
+        // V is local to the constructor to validate at construction time 
things are fine, but don't want in the type
+        // sig of the class as it just gets verbose...
+        this.currentVersion = Objects.requireNonNull(currentVersion);
+        this.versionSerializer = 
(IVersionedSerializer<MessageVersionProvider>) 
Objects.requireNonNull(versionSerializer);
+        this.serializer = Objects.requireNonNull(serializer);
+    }
+
+    public IVersionedSerializer<I> serializer()
+    {
+        return serializer;
+    }
+
+    /**
+     * Serialize the specified type into the specified DataOutputStream 
instance.
+     *
+     * @param t   type that needs to be serialized
+     * @param out DataOutput into which serialization needs to happen.
+     * @throws IOException if serialization fails
+     */
+    public void serialize(I t, DataOutputPlus out) throws IOException
+    {
+        versionSerializer.serialize(currentVersion, out, 
currentVersion.messageVersion());
+        serializer.serialize(t, out, currentVersion.messageVersion());
+    }
+
+    /**
+     * Deserialize into the specified DataInputStream instance.
+     *
+     * @param in DataInput from which deserialization needs to happen.
+     * @return the type that was deserialized
+     * @throws IOException if deserialization fails
+     */
+    public I deserialize(DataInputPlus in) throws IOException
+    {
+        MessageVersionProvider version = versionSerializer.deserialize(in, 
currentVersion.messageVersion());
+        return serializer.deserialize(in, version.messageVersion());
+    }
+
+    /**
+     * Calculate serialized size of object without actually serializing.
+     *
+     * @param t object to calculate serialized size
+     * @return serialized size of object t
+     */
+    public long serializedSize(I t)
+    {
+        long size = versionSerializer.serializedSize(currentVersion, 
currentVersion.messageVersion());
+        size += serializer.serializedSize(t, currentVersion.messageVersion());
+        return size;
+    }
+}
diff --git a/src/java/org/apache/cassandra/io/MessageVersionProvider.java 
b/src/java/org/apache/cassandra/io/MessageVersionProvider.java
new file mode 100644
index 0000000000..a6ad468281
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/MessageVersionProvider.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io;
+
+public interface MessageVersionProvider
+{
+    int messageVersion();
+}
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 3518888591..1cee468cd3 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.tracing.Tracing;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
-public class ResponseVerbHandler implements IVerbHandler
+class ResponseVerbHandler implements IVerbHandler
 {
     public static final ResponseVerbHandler instance = new 
ResponseVerbHandler();
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 3c373e60df..00feea4c4e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -184,6 +184,7 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
 //               ", txn=" + txn +
 //               ", writes=" + writes +
 //               ", result=" + result +
+               ", txn is null?=" + (txn.get() == null) +
                ", isGloballyPersistent=" + isGloballyPersistent +
                ", waitingOnCommit=" + waitingOnCommit +
                ", waitingOnApply=" + waitingOnApply +
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index f8659e0b3c..ce6c8b0e68 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -172,7 +173,8 @@ public class AccordCommandStore extends CommandStore
     public Command command(TxnId txnId)
     {
         AccordCommand command = getCommandInternal(txnId);
-        if (command.isEmpty()) command.initialize();
+        if (command.isEmpty())
+            command.initialize();
         return command;
     }
 
@@ -190,8 +192,9 @@ public class AccordCommandStore extends CommandStore
 
     private AccordCommandsForKey getCommandsForKeyInternal(Key key)
     {
-        Preconditions.checkState(currentCtx != null);
-        Preconditions.checkArgument(key instanceof PartitionKey);
+        Objects.requireNonNull(currentCtx, "current context");
+        if (!(key instanceof PartitionKey))
+            throw new IllegalArgumentException("Attempted to use 
non-PartitionKey; given " + key.getClass());
         AccordCommandsForKey commandsForKey = 
currentCtx.commandsForKey.get((PartitionKey) key);
         if (commandsForKey == null)
             throw new IllegalArgumentException("No commandsForKey in context 
for key " + key);
@@ -263,7 +266,7 @@ public class AccordCommandStore extends CommandStore
         }
         catch (ExecutionException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException(e.getCause());
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index 54ce46a3f9..d2d84899fc 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -408,7 +408,7 @@ public class AccordCommandsForKey extends CommandsForKey 
implements AccordState<
         // we use the executeAt time instead of the monotonic database 
timestamp to prevent uneven
         // ttl expiration in extreme cases, ie 1M+ writes/second to a key 
causing timestamps to overflow
         // into the next second on some keys and not others.
-        return (int) 
TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get()));
+        return 
Math.toIntExact(TimeUnit.MICROSECONDS.toSeconds(getTimestampMicros(lastExecutedTimestamp.get())));
     }
 
     public long timestampMicrosFor(Timestamp executeAt, boolean isForWriteTxn)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index d2c3ef6928..d37f39a32c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -33,8 +33,6 @@ import java.util.function.Supplier;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +44,8 @@ import accord.primitives.Ballot;
 import accord.primitives.Deps;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.txn.Txn;
+import accord.txn.Writes;
 import accord.utils.DeterministicIdentitySet;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -80,9 +80,9 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.LocalVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Functions;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -115,7 +115,6 @@ public class AccordKeyspace
     private static final Logger logger = 
LoggerFactory.getLogger(AccordKeyspace.class);
 
     public static final String COMMANDS = "commands";
-    public static final String COMMAND_SERIES = "command_series";
     public static final String COMMANDS_FOR_KEY = "commands_for_key";
 
     private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int, 
bigint>";
@@ -134,20 +133,14 @@ public class AccordKeyspace
               + format("txn_id %s,", TIMESTAMP_TUPLE)
               + "status int,"
               + "home_key blob,"
-              + "home_key_version int,"
               + "progress_key blob,"
-              + "progress_key_version blob,"
               + "is_globally_persistent boolean,"
-              + "txn_version int,"
               + "txn blob,"
               + format("execute_at %s,", TIMESTAMP_TUPLE)
               + format("promised_ballot %s,", TIMESTAMP_TUPLE)
               + format("accepted_ballot %s,", TIMESTAMP_TUPLE)
-              + "dependencies_version int,"
               + "dependencies blob,"
-              + "writes_version int,"
               + "writes blob,"
-              + "result_version int,"
               + "result blob,"
               + format("waiting_on_commit map<%s, blob>,", TIMESTAMP_TUPLE)
               + format("waiting_on_apply map<%s, blob>,", TIMESTAMP_TUPLE)
@@ -157,6 +150,20 @@ public class AccordKeyspace
               + "PRIMARY KEY((store_generation, store_index, txn_id))"
               + ')');
 
+    private static class CommandsSerializers
+    {
+        static final LocalVersionedSerializer<AccordKey> ACCORD_KEY_SERIALIZER 
= localSerializer(AccordKey.serializer);
+        static final LocalVersionedSerializer<Txn> TXN_SERIALIZER = 
localSerializer(CommandSerializers.txn);
+        static final LocalVersionedSerializer<Deps> DEPS_SERIALIZER = 
localSerializer(CommandSerializers.deps);
+        static final LocalVersionedSerializer<Writes> WRITES_SERIALIZER = 
localSerializer(CommandSerializers.writes);
+        static final LocalVersionedSerializer<AccordData> 
RESULT_DATA_SERIALIZER = localSerializer(AccordData.serializer);
+
+        private static <T> LocalVersionedSerializer<T> 
localSerializer(IVersionedSerializer<T> serializer)
+        {
+            return new 
LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT, 
AccordSerializerVersion.serializer, serializer);
+        }
+    }
+
     private static ColumnMetadata getColumn(TableMetadata metadata, String 
name)
     {
         ColumnMetadata column = metadata.getColumn(new ColumnIdentifier(name, 
true));
@@ -170,20 +177,14 @@ public class AccordKeyspace
         static final ClusteringComparator keyComparator = 
Commands.partitionKeyAsClusteringComparator();
         static final ColumnMetadata status = getColumn(Commands, "status");
         static final ColumnMetadata home_key = getColumn(Commands, "home_key");
-        static final ColumnMetadata home_key_version = getColumn(Commands, 
"home_key_version");
         static final ColumnMetadata progress_key = getColumn(Commands, 
"progress_key");
-        static final ColumnMetadata progress_key_version = getColumn(Commands, 
"progress_key_version");
         static final ColumnMetadata is_globally_persistent = 
getColumn(Commands, "is_globally_persistent");
-        static final ColumnMetadata txn_version = getColumn(Commands, 
"txn_version");
         static final ColumnMetadata txn = getColumn(Commands, "txn");
         static final ColumnMetadata execute_at = getColumn(Commands, 
"execute_at");
         static final ColumnMetadata promised_ballot = getColumn(Commands, 
"promised_ballot");
         static final ColumnMetadata accepted_ballot = getColumn(Commands, 
"accepted_ballot");
-        static final ColumnMetadata dependencies_version = getColumn(Commands, 
"dependencies_version");
         static final ColumnMetadata dependencies = getColumn(Commands, 
"dependencies");
-        static final ColumnMetadata writes_version = getColumn(Commands, 
"writes_version");
         static final ColumnMetadata writes = getColumn(Commands, "writes");
-        static final ColumnMetadata result_version = getColumn(Commands, 
"result_version");
         static final ColumnMetadata result = getColumn(Commands, "result");
         static final ColumnMetadata waiting_on_commit = getColumn(Commands, 
"waiting_on_commit");
         static final ColumnMetadata waiting_on_apply = getColumn(Commands, 
"waiting_on_apply");
@@ -235,15 +236,21 @@ public class AccordKeyspace
             return commandsForKey.maxTimestamp.hasModifications()
                    || commandsForKey.lastExecutedTimestamp.hasModifications()
                    || commandsForKey.lastExecutedMicros.hasModifications()
+                   || commandsForKey.lastWriteTimestamp.hasModifications()
                    || commandsForKey.blindWitnessed.hasModifications();
         }
 
+        private static boolean hasRegularChanges(AccordCommandsForKey 
commandsForKey)
+        {
+            return commandsForKey.uncommitted.map.hasModifications()
+                   || commandsForKey.committedById.map.hasModifications()
+                   || 
commandsForKey.committedByExecuteAt.map.hasModifications();
+        }
+
         static RegularAndStaticColumns columnsFor(AccordCommandsForKey 
commandsForKey)
         {
             boolean hasStaticChanges = hasStaticChanges(commandsForKey);
-            boolean hasRegularChanges = 
commandsForKey.uncommitted.map.hasAdditions()
-                                        || 
commandsForKey.committedById.map.hasAdditions()
-                                        || 
commandsForKey.committedByExecuteAt.map.hasAdditions();
+            boolean hasRegularChanges = hasRegularChanges(commandsForKey);
 
             if (hasStaticChanges && hasRegularChanges)
                 return all;
@@ -252,7 +259,7 @@ public class AccordKeyspace
             else if (hasRegularChanges)
                 return justRegular;
             else
-                throw new IllegalArgumentException();
+                throw new IllegalArgumentException("CommandsForKey 
has_modifications=" + commandsForKey.hasModifications() + ", but no Static or 
Regular columns changed!");
         }
     }
 
@@ -275,41 +282,34 @@ public class AccordKeyspace
         return Tables.of(Commands, CommandsForKey);
     }
 
-    private static <T> ByteBuffer serialize(T obj, IVersionedSerializer<T> 
serializer, int version) throws IOException
+    private static <T> ByteBuffer serialize(T obj, LocalVersionedSerializer<T> 
serializer) throws IOException
     {
-        int size = (int) serializer.serializedSize(obj, version);
+        int size = (int) serializer.serializedSize(obj);
         try (DataOutputBuffer out = new DataOutputBuffer(size))
         {
-            serializer.serialize(obj, out, version);
-            assert size == out.buffer().limit();
-            return out.buffer();
+            serializer.serialize(obj, out);
+            ByteBuffer bb = out.buffer();
+            assert size == bb.limit() : String.format("Expected to write %d 
but wrote %d", size, bb.limit());
+            return bb;
         }
     }
 
-    private static <T> ByteBuffer serializeOrNull(T obj, 
IVersionedSerializer<T> serializer, int version) throws IOException
+    private static <T> ByteBuffer serializeOrNull(T obj, 
LocalVersionedSerializer<T> serializer) throws IOException
     {
-        return obj != null ? serialize(obj, serializer, version) : 
EMPTY_BYTE_BUFFER;
+        return obj != null ? serialize(obj, serializer) : EMPTY_BYTE_BUFFER;
     }
 
-    private static <T> T deserialize(ByteBuffer bytes, IVersionedSerializer<T> 
serializer, int version) throws IOException
+    private static <T> T deserialize(ByteBuffer bytes, 
LocalVersionedSerializer<T> serializer) throws IOException
     {
         try (DataInputBuffer in = new DataInputBuffer(bytes, true))
         {
-            return serializer.deserialize(in, version);
+            return serializer.deserialize(in);
         }
     }
 
-    private static <T> T deserializeOrNull(ByteBuffer bytes, 
IVersionedSerializer<T> serializer, int version) throws IOException
+    private static <T> T deserializeOrNull(ByteBuffer bytes, 
LocalVersionedSerializer<T> serializer) throws IOException
     {
-        return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? 
deserialize(bytes, serializer, version) : null;
-    }
-
-    private static Map<ByteBuffer, ByteBuffer> serializeWaitingOn(Map<TxnId, 
ByteBuffer> waitingOn)
-    {
-        Map<ByteBuffer, ByteBuffer> result = 
Maps.newHashMapWithExpectedSize(waitingOn.size());
-        for (Map.Entry<TxnId, ByteBuffer> entry : waitingOn.entrySet())
-            result.put(serializeTimestamp(entry.getKey()), entry.getValue());
-        return result;
+        return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ? 
deserialize(bytes, serializer) : null;
     }
 
     private static NavigableMap<TxnId, ByteBuffer> 
deserializeWaitingOn(Map<ByteBuffer, ByteBuffer> serialized)
@@ -345,16 +345,6 @@ public class AccordKeyspace
         return deserializeTimestampSet(row.getSet(name, BytesType.instance), 
TreeSet::new, TxnId::new);
     }
 
-    public static Set<ByteBuffer> serializeListeners(Set<ListenerProxy> 
listeners)
-    {
-        Set<ByteBuffer> result = 
Sets.newHashSetWithExpectedSize(listeners.size());
-        for (ListenerProxy listener : listeners)
-        {
-            result.add(listener.identifier());
-        }
-        return result;
-    }
-
     private static DeterministicIdentitySet<ListenerProxy> 
deserializeListeners(CommandStore commandStore, Set<ByteBuffer> serialized) 
throws IOException
     {
         if (serialized == null || serialized.isEmpty())
@@ -433,32 +423,22 @@ public class AccordKeyspace
             Row.Builder builder = BTreeRow.unsortedBuilder();
             builder.newRow(Clustering.EMPTY);
             int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
-            int version = MessagingService.current_version;
-            ByteBuffer versionBytes = accessor.valueOf(version);
+
 
             if (command.status.hasModifications())
                 builder.addCell(live(CommandsColumns.status, timestampMicros, 
accessor.valueOf(command.status.get().ordinal())));
 
             if (command.homeKey.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.home_key_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.home_key, 
timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(), 
AccordKey.serializer, version)));
-            }
+                builder.addCell(live(CommandsColumns.home_key, 
timestampMicros, serializeOrNull((AccordKey) command.homeKey.get(), 
CommandsSerializers.ACCORD_KEY_SERIALIZER)));
 
             if (command.progressKey.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.progress_key_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.progress_key, 
timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(), 
AccordKey.serializer, version)));
-            }
+                builder.addCell(live(CommandsColumns.progress_key, 
timestampMicros, serializeOrNull((AccordKey) command.progressKey.get(), 
CommandsSerializers.ACCORD_KEY_SERIALIZER)));
 
             if (command.isGloballyPersistent.hasModifications())
                 builder.addCell(live(CommandsColumns.is_globally_persistent, 
timestampMicros, accessor.valueOf(command.isGloballyPersistent.get())));
 
             if (command.txn.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.txn_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.txn, timestampMicros, 
serializeOrNull(command.txn.get(), CommandSerializers.txn, version)));
-            }
+                builder.addCell(live(CommandsColumns.txn, timestampMicros, 
serializeOrNull(command.txn.get(), CommandsSerializers.TXN_SERIALIZER)));
 
             if (command.executeAt.hasModifications())
                 builder.addCell(live(CommandsColumns.execute_at, 
timestampMicros, serializeTimestamp(command.executeAt.get())));
@@ -470,22 +450,13 @@ public class AccordKeyspace
                 builder.addCell(live(CommandsColumns.accepted_ballot, 
timestampMicros, serializeTimestamp(command.accepted.get())));
 
             if (command.deps.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.dependencies_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.dependencies, 
timestampMicros, serialize(command.deps.get(), CommandSerializers.deps, 
version)));
-            }
+                builder.addCell(live(CommandsColumns.dependencies, 
timestampMicros, serialize(command.deps.get(), 
CommandsSerializers.DEPS_SERIALIZER)));
 
             if (command.writes.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.writes_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.writes, timestampMicros, 
serialize(command.writes.get(), CommandSerializers.writes, version)));
-            }
+                builder.addCell(live(CommandsColumns.writes, timestampMicros, 
serialize(command.writes.get(), CommandsSerializers.WRITES_SERIALIZER)));
 
             if (command.result.hasModifications())
-            {
-                builder.addCell(live(CommandsColumns.result_version, 
timestampMicros, versionBytes));
-                builder.addCell(live(CommandsColumns.result, timestampMicros, 
serialize((AccordData) command.result.get(), AccordData.serializer, version)));
-            }
+                builder.addCell(live(CommandsColumns.result, timestampMicros, 
serialize((AccordData) command.result.get(), 
CommandsSerializers.RESULT_DATA_SERIALIZER)));
 
             if (command.waitingOnCommit.hasModifications())
             {
@@ -535,7 +506,6 @@ public class AccordKeyspace
 
     private static ByteBuffer serializeKey(PartitionKey key)
     {
-        UUIDSerializer.instance.serialize(key.tableId().asUUID());
         return TupleType.buildValue(new 
ByteBuffer[]{UUIDSerializer.instance.serialize(key.tableId().asUUID()),
                                                      
key.partitionKey().getKey()});
     }
@@ -570,12 +540,12 @@ public class AccordKeyspace
         return command;
     }
 
-    private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row, 
String dataColumn, String versionColumn, IVersionedSerializer<T> serializer, 
Supplier<T> defaultSupplier) throws IOException
+    private static <T> T deserializeWithVersionOr(UntypedResultSet.Row row, 
String dataColumn, LocalVersionedSerializer<T> serializer, Supplier<T> 
defaultSupplier) throws IOException
     {
-        if (!row.has(versionColumn))
+        if (!row.has(dataColumn))
             return defaultSupplier.get();
 
-        return deserialize(row.getBlob(dataColumn), serializer, 
row.getInt(versionColumn));
+        return deserialize(row.getBlob(dataColumn), serializer);
     }
 
     public static UntypedResultSet loadCommandRow(CommandStore commandStore, 
TxnId txnId)
@@ -598,11 +568,6 @@ public class AccordKeyspace
         AccordCommandStore commandStore = command.commandStore();
         commandStore.checkNotInStoreThread();
 
-        String cql = "SELECT * FROM %s.%s " +
-                     "WHERE store_generation=? " +
-                     "AND store_index=? " +
-                     "AND txn_id=(?, ?, ?, ?)";
-
         UntypedResultSet result = loadCommandRow(commandStore, 
command.txnId());
 
         if (result.isEmpty())
@@ -616,16 +581,16 @@ public class AccordKeyspace
             UntypedResultSet.Row row = result.one();
             Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id", 
TxnId::new).equals(txnId));
             command.status.load(Status.values()[row.getInt("status")]);
-            command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), 
AccordKey.serializer, row.getInt("home_key_version")));
-            
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), 
AccordKey.serializer, row.getInt("progress_key_version")));
+            command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), 
CommandsSerializers.ACCORD_KEY_SERIALIZER));
+            
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), 
CommandsSerializers.ACCORD_KEY_SERIALIZER));
             
command.isGloballyPersistent.load(row.getBoolean("is_globally_persistent"));
-            command.txn.load(deserializeOrNull(row.getBlob("txn"), 
CommandSerializers.txn, row.getInt("txn_version")));
+            command.txn.load(deserializeOrNull(row.getBlob("txn"), 
CommandsSerializers.TXN_SERIALIZER));
             command.executeAt.load(deserializeTimestampOrNull(row, 
"execute_at", Timestamp::new));
             command.promised.load(deserializeTimestampOrNull(row, 
"promised_ballot", Ballot::new));
             command.accepted.load(deserializeTimestampOrNull(row, 
"accepted_ballot", Ballot::new));
-            command.deps.load(deserializeWithVersionOr(row, "dependencies", 
"dependencies_version", CommandSerializers.deps, () -> Deps.NONE));
-            command.writes.load(deserializeWithVersionOr(row, "writes", 
"writes_version", CommandSerializers.writes, () -> null));
-            command.result.load(deserializeWithVersionOr(row, "result", 
"result_version", AccordData.serializer, () -> null));
+            command.deps.load(deserializeWithVersionOr(row, "dependencies", 
CommandsSerializers.DEPS_SERIALIZER, () -> Deps.NONE));
+            command.writes.load(deserializeWithVersionOr(row, "writes", 
CommandsSerializers.WRITES_SERIALIZER, () -> null));
+            command.result.load(deserializeWithVersionOr(row, "result", 
CommandsSerializers.RESULT_DATA_SERIALIZER, () -> null));
             command.waitingOnCommit.load(deserializeWaitingOn(row, 
"waiting_on_commit"));
             command.blockingCommitOn.load(deserializeBlocking(row, 
"blocking_commit_on"));
             command.waitingOnApply.load(deserializeWaitingOn(row, 
"waiting_on_apply"));
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index b723b27e6e..32c136b1b7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.accord;
 
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.Objects;
 
 import com.google.common.base.Preconditions;
 
@@ -89,7 +90,7 @@ public class AccordMessageSink implements MessageSink
     public void send(Node.Id to, Request request)
     {
         Verb verb = getVerb(request.type());
-        Preconditions.checkArgument(verb != null);
+        Objects.requireNonNull(verb, "verb");
         Message<Request> message = Message.out(verb, request);
         InetAddressAndPort endpoint = getEndpoint(to);
         logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 1419a7595f..c2c3010e43 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -32,12 +32,10 @@ import accord.primitives.Deps;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.txn.Txn;
-import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.accord.async.AsyncContext;
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 
@@ -62,7 +60,7 @@ public class AccordPartialCommand implements PartialCommand
         }
 
         @Override
-        public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp 
executeAt, Status status, DataInputPlus in, Version version) throws IOException
+        public PartialCommand deserializeBody(TxnId txnId, Txn txn, Timestamp 
executeAt, Status status, DataInputPlus in, AccordSerializerVersion version) 
throws IOException
         {
             return new AccordPartialCommand(txnId, txn, executeAt, status);
         }
@@ -121,7 +119,8 @@ public class AccordPartialCommand implements PartialCommand
 
     public void forEachRemovedListener(Consumer<Listener> consumer)
     {
-        removedListeners.forEach(consumer);
+        if (removedListeners != null)
+            removedListeners.forEach(consumer);
     }
 
     @Override
@@ -168,24 +167,24 @@ public class AccordPartialCommand implements 
PartialCommand
             }
 
             @Override
-            public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn 
txn, Timestamp executeAt, Status status, DataInputPlus in, Version version) 
throws IOException
+            public PartialCommand.WithDeps deserializeBody(TxnId txnId, Txn 
txn, Timestamp executeAt, Status status, DataInputPlus in, 
AccordSerializerVersion version) throws IOException
             {
-                Deps deps = deserializeNullable(in, version.msg_version, 
CommandSerializers.deps);
+                Deps deps = deserializeNullable(in, version.msgVersion, 
CommandSerializers.deps);
                 return new AccordPartialCommand.WithDeps(txnId, txn, 
executeAt, status, deps);
             }
 
             @Override
-            public void serialize(PartialCommand.WithDeps command, 
DataOutputPlus out, Version version) throws IOException
+            public void serialize(PartialCommand.WithDeps command, 
DataOutputPlus out, AccordSerializerVersion version) throws IOException
             {
                 super.serialize(command, out, version);
-                serializeNullable(command.savedDeps(), out, 
version.msg_version, CommandSerializers.deps);
+                serializeNullable(command.savedDeps(), out, 
version.msgVersion, CommandSerializers.deps);
             }
 
             @Override
-            public int serializedSize(PartialCommand.WithDeps command, Version 
version)
+            public int serializedSize(PartialCommand.WithDeps command, 
AccordSerializerVersion version)
             {
                 int size = super.serializedSize(command, version) ;
-                size += serializedSizeNullable(command.savedDeps(), 
version.msg_version, CommandSerializers.deps);
+                size += serializedSizeNullable(command.savedDeps(), 
version.msgVersion, CommandSerializers.deps);
                 return size;
             }
 
@@ -228,44 +227,18 @@ public class AccordPartialCommand implements 
PartialCommand
 
     public static abstract class PartialCommandSerializer<T extends 
PartialCommand>
     {
-        public enum Version
+        public void serialize(T command, DataOutputPlus out, 
AccordSerializerVersion version) throws IOException
         {
-            VERSION_0(0, MessagingService.current_version);
-            final byte version;
-            final int msg_version;
-
-            Version(int version, int msg_version)
-            {
-                this.version = (byte) version;
-                this.msg_version = msg_version;
-            }
-
-            public static final Version current = VERSION_0;
-
-            public static Version fromByte(byte b)
-            {
-                switch (b)
-                {
-                    case 0:
-                        return VERSION_0;
-                    default:
-                        throw new IllegalArgumentException();
-                }
-            }
-        }
-
-        public void serialize(T command, DataOutputPlus out, Version version) 
throws IOException
-        {
-            out.write(version.version);
-            CommandSerializers.txnId.serialize(command.txnId(), out, 
version.msg_version);
-            CommandSerializers.status.serialize(command.status(), out, 
version.msg_version);
-            serializeNullable(command.txn(), out, version.msg_version, 
CommandSerializers.txn);
-            serializeNullable(command.executeAt(), out, version.msg_version, 
CommandSerializers.timestamp);
+            AccordSerializerVersion.serializer.serialize(version, out);
+            CommandSerializers.txnId.serialize(command.txnId(), out, 
version.msgVersion);
+            CommandSerializers.status.serialize(command.status(), out, 
version.msgVersion);
+            serializeNullable(command.txn(), out, version.msgVersion, 
CommandSerializers.txn);
+            serializeNullable(command.executeAt(), out, version.msgVersion, 
CommandSerializers.timestamp);
         }
 
         public ByteBuffer serialize(T command)
         {
-            Version version = Version.current;
+            AccordSerializerVersion version = AccordSerializerVersion.CURRENT;
             int size = serializedSize(command, version);
             try (DataOutputBuffer out = new DataOutputBuffer(size))
             {
@@ -278,24 +251,24 @@ public class AccordPartialCommand implements 
PartialCommand
             }
         }
 
-        public Version deserializeVersion(DataInputPlus in) throws IOException
+        public AccordSerializerVersion deserializeVersion(DataInputPlus in) 
throws IOException
         {
-            return Version.fromByte(in.readByte());
+            return AccordSerializerVersion.serializer.deserialize(in);
         }
 
         // check for cached command first, otherwise deserialize
         public T deserialize(AccordCommandStore commandStore, DataInputPlus 
in) throws IOException
         {
-            Version version = deserializeVersion(in);
-            TxnId txnId = CommandSerializers.txnId.deserialize(in, 
version.msg_version);
+            AccordSerializerVersion version = deserializeVersion(in);
+            TxnId txnId = CommandSerializers.txnId.deserialize(in, 
version.msgVersion);
             AsyncContext context = commandStore.getContext();
             T command = getCachedFull(txnId, context);
             if (command != null)
                 return command;
 
-            Status status = CommandSerializers.status.deserialize(in, 
version.msg_version);
-            Txn txn = deserializeNullable(in, version.msg_version, 
CommandSerializers.txn);
-            Timestamp executeAt = deserializeNullable(in, version.msg_version, 
CommandSerializers.timestamp);
+            Status status = CommandSerializers.status.deserialize(in, 
version.msgVersion);
+            Txn txn = deserializeNullable(in, version.msgVersion, 
CommandSerializers.txn);
+            Timestamp executeAt = deserializeNullable(in, version.msgVersion, 
CommandSerializers.timestamp);
             T partial = deserializeBody(txnId, txn, executeAt, status, in, 
version);
             addToContext(partial, context);
             return partial;
@@ -313,19 +286,19 @@ public class AccordPartialCommand implements 
PartialCommand
             }
         }
 
-        public int serializedSize(T command, Version version)
+        public int serializedSize(T command, AccordSerializerVersion version)
         {
-            int size = TypeSizes.sizeof(version.version);
+            int size = 
Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
             size += CommandSerializers.txnId.serializedSize();
-            size += CommandSerializers.status.serializedSize(command.status(), 
version.msg_version);
-            size += serializedSizeNullable(command.txn(), version.msg_version, 
CommandSerializers.txn);
-            size += serializedSizeNullable(command.executeAt(), 
version.msg_version, CommandSerializers.timestamp);
+            size += CommandSerializers.status.serializedSize(command.status(), 
version.msgVersion);
+            size += serializedSizeNullable(command.txn(), version.msgVersion, 
CommandSerializers.txn);
+            size += serializedSizeNullable(command.executeAt(), 
version.msgVersion, CommandSerializers.timestamp);
             return size;
         }
 
         public abstract T getCachedFull(TxnId txnId, AsyncContext context);
         public abstract void addToContext(T command, AsyncContext context);
-        public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp 
executeAt, Status status, DataInputPlus in, Version version) throws IOException;
+        public abstract T deserializeBody(TxnId txnId, Txn txn, Timestamp 
executeAt, Status status, DataInputPlus in, AccordSerializerVersion version) 
throws IOException;
 
         /**
          * Determines if current modifications require updating command data 
duplicated elsewhere
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java 
b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java
new file mode 100644
index 0000000000..cb6f8000f1
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordSerializerVersion.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.MessageVersionProvider;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public enum AccordSerializerVersion implements MessageVersionProvider
+{
+    // If MessagingService version bumps, this mapping does not need to be 
updated; only updates needed are those that
+    // include accord serializer changes.
+    V1(1, MessagingService.VERSION_40);
+
+    public static final AccordSerializerVersion CURRENT = V1;
+    public static final Serializer serializer = new Serializer();
+
+    public final int version;
+    public final int msgVersion;
+
+    AccordSerializerVersion(int version, int msgVersion)
+    {
+        this.version = version;
+        this.msgVersion = msgVersion;
+    }
+
+    public static AccordSerializerVersion fromVersion(int version)
+    {
+        switch (version)
+        {
+            case 1:
+                return V1;
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    public static AccordSerializerVersion fromMessageVersion(int version)
+    {
+        AccordSerializerVersion[] versions = values();
+        for (int i = versions.length - 1; i >= 0; i--)
+        {
+            AccordSerializerVersion v = versions[i];
+            // If network version bumped (12 to 13), the accord serializers 
may not have been changed; use the largest
+            // version smaller than or equal to this version
+            if (v.msgVersion <= version)
+                return v;
+        }
+        throw new IllegalArgumentException("Attempted to use message version " 
+ version + " which is smaller than " + versions[0] + " can handle (" + 
versions[0].msgVersion + ")");
+    }
+
+    @Override
+    public int messageVersion()
+    {
+        return msgVersion;
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<AccordSerializerVersion>
+    {
+        @Override
+        public void serialize(AccordSerializerVersion t, DataOutputPlus out, 
int version) throws IOException
+        {
+            serialize(t, out);
+        }
+
+        public void serialize(AccordSerializerVersion t, DataOutputPlus out) 
throws IOException
+        {
+            out.writeUnsignedVInt(t.version);
+        }
+
+        @Override
+        public AccordSerializerVersion deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            return deserialize(in);
+        }
+
+        public AccordSerializerVersion deserialize(DataInputPlus in) throws 
IOException
+        {
+            return fromVersion(Math.toIntExact(in.readUnsignedVInt()));
+        }
+
+        @Override
+        public long serializedSize(AccordSerializerVersion t, int version)
+        {
+            return serializedSize(t);
+        }
+
+        public long serializedSize(AccordSerializerVersion t)
+        {
+            return TypeSizes.sizeofUnsignedVInt(t.version);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 0a04a6e550..6b3f2b8f02 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.txn.Txn;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -106,24 +105,24 @@ public class AccordStateCache
         private Node<?, ?> prev;
         private Node<?, ?> next;
         private int references = 0;
-        private long lastQueriedSize = 0;
+        private long lastQueriedEstimatedSizeOnHeap = 0;
 
         Node(V value)
         {
             this.value = value;
         }
 
-        long size()
+        long estimatedSizeOnHeap()
         {
             long result = EMPTY_SIZE + value.estimatedSizeOnHeap();
-            lastQueriedSize = result;
+            lastQueriedEstimatedSizeOnHeap = result;
             return result;
         }
 
-        long sizeDelta()
+        long estimatedSizeOnHeapDelta()
         {
-            long prevSize = lastQueriedSize;
-            return size() - prevSize;
+            long prevSize = lastQueriedEstimatedSizeOnHeap;
+            return estimatedSizeOnHeap() - prevSize;
         }
 
         K key()
@@ -184,7 +183,7 @@ public class AccordStateCache
 
         if (prev == null)
         {
-            Preconditions.checkState(head == node);
+            Preconditions.checkState(head == node, "previous is null but the 
head isnt the provided node!");
             head = next;
         }
         else
@@ -194,7 +193,7 @@ public class AccordStateCache
 
         if (next == null)
         {
-            Preconditions.checkState(tail == node);
+            Preconditions.checkState(tail == node, "next is null but the tail 
isnt the provided node!");
             tail = prev;
         }
         else
@@ -224,15 +223,16 @@ public class AccordStateCache
 
     private void updateSize(Node<?, ?> node)
     {
-        bytesCached += node.sizeDelta();
+        bytesCached += node.estimatedSizeOnHeapDelta();
     }
 
     // don't evict if there's an outstanding save future. If an item is 
evicted then reloaded
     // before it's mutation is applied, out of date info will be loaded
     private boolean canEvict(Object key)
     {
+        // getFuture only returns a future if it is running, so don't need to 
check if its still running
         Future<?> future = getFuture(saveFutures, key);
-        return future == null || future.isDone();
+        return future == null;
     }
 
     private void maybeEvict()
@@ -255,7 +255,7 @@ public class AccordStateCache
             logger.trace("Evicting {} {}", 
evict.value.getClass().getSimpleName(), evict.key());
             unlink(evict);
             cache.remove(evict.key());
-            bytesCached -= evict.size();
+            bytesCached -= evict.estimatedSizeOnHeap();
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 2cdb099394..cc824b3890 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -139,7 +139,7 @@ public class AsyncLoader
             }
             catch (Throwable t)
             {
-                logger.error(String.format("Exception loading %s for %s", 
command.txnId(), callback), t);
+                logger.error("Exception loading {} for {}", command.txnId(), 
callback, t);
                 throw t;
             }
         });
@@ -157,7 +157,7 @@ public class AsyncLoader
             }
             catch (Throwable t)
             {
-                logger.error(String.format("Exception loading %s for %s", 
cfk.key(), callback), t);
+                logger.error("Exception loading {} for {}", cfk.key(), 
callback, t);
                 throw t;
             }
         });
@@ -231,7 +231,7 @@ public class AsyncLoader
             case FINISHED:
                 break;
             default:
-                throw new IllegalStateException();
+                throw new IllegalStateException("Unexpected state: " + state);
         }
 
         logger.trace("Exiting load for {} with state {}: {} {}", callback, 
state, txnIds, keys);
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index ca2ea8617d..5fa51425a5 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -96,7 +96,7 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
     @Override
     public String toString()
     {
-        return "AsyncOperation{" + state + "}-0x" + 
Integer.toHexString(System.identityHashCode(this));
+        return "AsyncOperation{" + state + "}-" + loggingId;
     }
 
     AsyncWriter createAsyncWriter(AccordCommandStore commandStore)
@@ -182,20 +182,26 @@ public abstract class AsyncOperation<R> extends 
AsyncPromise<R> implements Runna
     {
         setLoggingIds();
         logger.trace("Running {} with state {}", this, state);
-        commandStore.checkInStoreThread();
-        commandStore.setContext(context);
         try
         {
-            runInternal();
-        }
-        catch (Throwable t)
-        {
-            logger.error(String.format("Operation %s failed", this), t);
-            tryFailure(t);
+            commandStore.checkInStoreThread();
+            commandStore.setContext(context);
+            try
+            {
+                runInternal();
+            }
+            catch (Throwable t)
+            {
+                logger.error(String.format("Operation %s failed", this), t);
+                tryFailure(t);
+            }
+            finally
+            {
+                commandStore.unsetContext(context);
+            }
         }
         finally
         {
-            commandStore.unsetContext(context);
             logger.trace("Exiting {}", this);
             clearLoggingIds();
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index 750aa93e12..04a63a1670 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -65,8 +65,8 @@ public class AsyncWriter
     private State state = State.INITIALIZED;
     protected Future<?> writeFuture;
     private final AccordCommandStore commandStore;
-    AccordStateCache.Instance<TxnId, AccordCommand> commandCache;
-    AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCache;
+    final AccordStateCache.Instance<TxnId, AccordCommand> commandCache;
+    final AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> 
cfkCache;
 
     public AsyncWriter(AccordCommandStore commandStore)
     {
@@ -97,7 +97,8 @@ public class AsyncWriter
                 continue;
             }
 
-            if (futures == null) futures = new ArrayList<>();
+            if (futures == null)
+                futures = new ArrayList<>();
             K key = item.key();
             Mutation mutation = mutationFunction.apply(item, timestamp);
             if (logger.isTraceEnabled())
@@ -124,7 +125,8 @@ public class AsyncWriter
         for (AccordState.WriteOnly<K, V> item : ctxGroup.writeOnly.values())
         {
             Preconditions.checkState(item.hasModifications());
-            if (futures == null) futures = new ArrayList<>();
+            if (futures == null)
+                futures = new ArrayList<>();
             Mutation mutation = mutationFunction.apply((V) item, timestamp);
             Future<?> future = Stage.MUTATION.submit((Runnable) 
mutation::apply);
             future.addListener(() -> cache.purgeWriteOnly(item.key()), 
commandStore.executor());
@@ -238,7 +240,7 @@ public class AsyncWriter
         }
 
         // There won't be a txn to denormalize against until the command has 
been preaccepted
-        if (command.status().hasBeen(Status.PreAccepted) && 
AccordPartialCommand.WithDeps.serializer.needsUpdate(command))
+        if (command.status().hasBeen(Status.PreAccepted) && 
AccordPartialCommand.WithDeps.serializer.needsUpdate(command) && 
!(command.txn() == null && command.status().isInvalidated()))
         {
             for (Key key : command.txn().keys())
             {
@@ -306,7 +308,7 @@ public class AsyncWriter
                 case FINISHED:
                     break;
                 default:
-                    throw new IllegalStateException();
+                    throw new IllegalStateException("Unexpected state: " + 
state);
             }
         }
         catch (IOException e)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 26a27ed69e..7760c81e7d 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -104,7 +104,7 @@ public class AcceptSerializers
         public void serialize(AcceptOk acceptOk, DataOutputPlus out, int 
version) throws IOException
         {
             CommandSerializers.txnId.serialize(acceptOk.txnId, out, version);
-            CommandSerializers.deps.serialize(acceptOk.deps, out, version);
+            NullableSerializer.serializeNullable(acceptOk.deps, out, version, 
CommandSerializers.deps);
 
         }
 
@@ -112,14 +112,14 @@ public class AcceptSerializers
         public AcceptOk deserialize(DataInputPlus in, int version) throws 
IOException
         {
             return new AcceptOk(CommandSerializers.txnId.deserialize(in, 
version),
-                                CommandSerializers.deps.deserialize(in, 
version));
+                                NullableSerializer.deserializeNullable(in, 
version, CommandSerializers.deps));
         }
 
         @Override
         public long serializedSize(AcceptOk acceptOk, int version)
         {
             return CommandSerializers.txnId.serializedSize(acceptOk.txnId, 
version)
-                 + CommandSerializers.deps.serializedSize(acceptOk.deps, 
version);
+                   + NullableSerializer.serializedSizeNullable(acceptOk.deps, 
version, CommandSerializers.deps);
         }
     };
 
diff --git 
a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java 
b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
index 473cc27032..17a0857198 100644
--- 
a/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
+++ 
b/test/simulator/asm/org/apache/cassandra/simulator/asm/InterceptClasses.java
@@ -61,7 +61,8 @@ public class InterceptClasses implements BiFunction<String, 
byte[], byte[]>
                                                             
"|org[/.]apache[/.]cassandra[/.]db.streaming[/.].*" +
                                                             
"|org[/.]apache[/.]cassandra[/.]distributed[/.]impl[/.]DirectStreamingConnectionFactory.*"
 +
                                                             
"|org[/.]apache[/.]cassandra[/.]db[/.]commitlog[/.].*" +
-                                                            
"|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*");
+                                                            
"|org[/.]apache[/.]cassandra[/.]service[/.]paxos[/.].*" +
+                                                            "|accord[/.].*");
 
     private static final Pattern GLOBAL_METHODS = 
Pattern.compile("org[/.]apache[/.]cassandra[/.](?!simulator[/.]).*" +
                                                                   
"|org[/.]apache[/.]cassandra[/.]simulator[/.]test[/.].*" +
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java 
b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
index 341ed63ed1..eb3df1734d 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulationRunner.java
@@ -376,6 +376,7 @@ public class SimulationRunner
                 catch (Throwable t)
                 {
                     logger().error("Failed on seed 0x{}", 
Long.toHexString(seed), t);
+                    throw t;
                 }
             }
         }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java 
b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
index 5be3384eeb..23fe5eaed6 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/SimulatorUtils.java
@@ -40,9 +40,10 @@ public class SimulatorUtils
     public static void dumpStackTraces(Logger logger)
     {
         Map<Thread, StackTraceElement[]> threadMap = 
Thread.getAllStackTraces();
-        threadMap.forEach((thread, ste) -> {
-            logger.error("{}:\n   {}", thread, Threads.prettyPrint(ste, false, 
"   ", "\n", ""));
-        });
+        String prefix = "   ";
+        String delimiter = "\n" + prefix;
+        threadMap.forEach((thread, ste) ->
+                          logger.error("{}:\n{}", thread, 
Threads.prettyPrint(ste, false, prefix, delimiter, "")));
         FastThreadLocal.destroy();
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 503afafc38..1351117999 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.service.accord.async;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,20 +65,6 @@ public class AsyncLoaderTest
         StorageService.instance.initServer();
     }
 
-    private static void load(AccordCommandStore commandStore, AsyncContext 
context, Iterable<TxnId> txnIds, Iterable<PartitionKey> keys)
-    {
-        execute(commandStore, () ->
-        {
-            AsyncLoader loader = new AsyncLoader(commandStore, txnIds, keys);
-            while (!loader.load(context, (o, t) -> Assert.assertNull(t)));
-        });
-    }
-
-    private static void load(AccordCommandStore commandStore, AsyncContext 
context, TxnId... txnIds)
-    {
-        load(commandStore, context, ImmutableList.copyOf(txnIds), 
Collections.emptyList());
-    }
-
     /**
      * Loading a cached resource shoudln't block
      */
@@ -118,7 +103,7 @@ public class AsyncLoaderTest
      * Loading a cached resource should block
      */
     @Test
-    public void loadTest() throws Throwable
+    public void loadTest()
     {
         AtomicLong clock = new AtomicLong(0);
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
@@ -160,7 +145,7 @@ public class AsyncLoaderTest
      * Test when some resources are cached and others need to be loaded
      */
     @Test
-    public void partialLoadTest() throws Throwable
+    public void partialLoadTest()
     {
         AtomicLong clock = new AtomicLong(0);
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 6ec036aa7b..4d788b6865 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 
 import accord.local.Command;
 import accord.local.PartialCommand;
-import accord.local.PreLoadContext;
 import accord.local.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -77,7 +76,7 @@ public class AsyncWriterTest
     }
 
     @Test
-    public void waitingOnDenormalization() throws Throwable
+    public void waitingOnDenormalization()
     {
         AtomicLong clock = new AtomicLong(0);
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to