This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a3a37f35199b87b85e97435c1d2bae6ceaa7e9ba
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sat Feb 1 17:42:21 2025 +0000

    Refactor RedundantStatus to encode vector of states that can be merged 
independently
    Also fix:
     - Truncate command on first access, without participants
     - Use Ballot.ZERO when invoking CFK.insertOutOfRange where appropriate
     - Don't supply a command's own route to ProgressLog.waiting to ensure new 
keys are incorporated
     - Ensure progress in CommandsForKey by setting vestigial commands to ERASED
     - Add any missing owned keys to StoreParticipants.route to ensure fetch 
can make progress
     - Recovery must wait for earlier not-accepted transactions if either has 
the privileged coordinator optimisation
     - Inclusive SyncPoint used incorrect topologies for propose phase
     - Barrier must not register local listener without up-to-date topology 
information
     - Stop home shard truncating a TxnId to vestigial rather than Invalidated 
so other shards can make progress
    Also improve:
     - Validate commands are constructed with non-empty participants
     - Remove some unnecessary synchronized keywords
     - Clear ok messages on PreAccept and Accept to free up memory
     - Introduce TxnId.Cardinality flag so we can optimise single key queries
     - Update CommandsForKey serialization to better handle larger flag space
     - Configurable which Txn.Kind can result in a CommandStore being marked 
stale
     - Process DefaultProgressLog queue synchronously when relevant state is 
resident in memory
     - Remove defunct CollectMaxApplied version of ListStore bootstrap
     - Standardise linearizability violation reporting
     - Improve CommandStore.execute method naming to reduce chance of misuse
     - Prune and address some comments
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20282
---
 modules/accord                                     |  2 +-
 .../cassandra/db/virtual/AccordDebugKeyspace.java  |  8 +--
 .../service/accord/AccordCommandStore.java         |  4 +-
 .../service/accord/AccordMessageSink.java          | 22 ++++----
 .../service/accord/AccordObjectSizes.java          | 10 ++--
 .../cassandra/service/accord/AccordService.java    |  3 +-
 .../cassandra/service/accord/api/AccordAgent.java  |  9 ++--
 .../accord/serializers/CommandSerializers.java     | 63 ++++++++++++++++------
 .../serializers/CommandStoreSerializers.java       | 20 +++----
 .../db/virtual/AccordDebugKeyspaceTest.java        |  4 +-
 .../cassandra/index/accord/RouteIndexTest.java     |  4 +-
 .../service/accord/AccordJournalOrderTest.java     |  8 ++-
 .../service/accord/CommandChangeTest.java          |  1 -
 .../serializers/CommandsForKeySerializerTest.java  |  8 +--
 .../apache/cassandra/utils/AccordGenerators.java   | 14 ++---
 15 files changed, 109 insertions(+), 71 deletions(-)

diff --git a/modules/accord b/modules/accord
index 78ab7eef90..cd7f49564a 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 78ab7eef904ef549d0d7a34332b83d6110e0762d
+Subproject commit cd7f49564a5ad053286453d10f8cd46b8d870c4f
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index 757fb9488f..ff8159b632 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -472,11 +472,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         ds.row(storeId, decompose(start), decompose(end))
                           .column("start_ownership_epoch", 
entry.startOwnershipEpoch)
                           .column("end_ownership_epoch", 
entry.endOwnershipEpoch)
-                          .column("locally_applied_or_invalidated_before", 
entry.locallyAppliedOrInvalidatedBefore.toString())
-                          
.column("locally_decided_and_applied_or_invalidated_before", 
entry.locallyDecidedAndAppliedOrInvalidatedBefore.toString())
-                          .column("shard_applied_or_invalidated_before", 
entry.shardAppliedOrInvalidatedBefore.toString())
+                          .column("locally_applied_before", 
entry.locallyAppliedBefore.toString())
+                          .column("locally_decided_and_applied_before", 
entry.locallyDecidedAndAppliedBefore.toString())
+                          .column("shard_applied_before", 
entry.shardAppliedBefore.toString())
                           .column("gc_before", entry.gcBefore.toString())
-                          .column("shard_only_applied_or_invalidated_before", 
entry.shardOnlyAppliedOrInvalidatedBefore.toString())
+                          .column("shard_only_applied_before", 
entry.shardOnlyAppliedBefore.toString())
                           .column("bootstrapped_at", 
entry.bootstrappedAt.toString())
                           .column("stale_until_at_least", 
entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
                         return ds;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 7fc135da58..c87cd41baf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -309,7 +309,7 @@ public class AccordCommandStore extends CommandStore
         return lastSystemTimestampMicros;
     }
     @Override
-    public <T> AsyncChain<T> submit(PreLoadContext loadCtx, Function<? super 
SafeCommandStore, T> function)
+    public <T> AsyncChain<T> build(PreLoadContext loadCtx, Function<? super 
SafeCommandStore, T> function)
     {
         return AccordTask.create(this, loadCtx, function).chain();
     }
@@ -336,7 +336,7 @@ public class AccordCommandStore extends CommandStore
     }
 
     @Override
-    public AsyncChain<Void> execute(PreLoadContext preLoadContext, Consumer<? 
super SafeCommandStore> consumer)
+    public AsyncChain<Void> build(PreLoadContext preLoadContext, Consumer<? 
super SafeCommandStore> consumer)
     {
         return AccordTask.create(this, preLoadContext, consumer).chain();
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index e5d897ed3b..cf8f8c7515 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -71,13 +71,13 @@ public class AccordMessageSink implements MessageSink
 
     public static final class AccordMessageType extends MessageType
     {
-        public static final AccordMessageType INTEROP_READ_REQ             = 
remote("INTEROP_READ_REQ",           false);
-        public static final AccordMessageType INTEROP_READ_RSP             = 
remote("INTEROP_READ_RSP",           false);
-        public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = 
remote("INTEROP_STABLE_THEN_READ_REQ", false);
-        public static final AccordMessageType INTEROP_READ_REPAIR_REQ      = 
remote("INTEROP_READ_REPAIR_REQ",    false);
-        public static final AccordMessageType INTEROP_READ_REPAIR_RSP      = 
remote("INTEROP_READ_REPAIR_RSP",    false);
-        public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ    = 
remote("INTEROP_APPLY_MINIMAL_REQ",  true );
-        public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ    = 
remote("INTEROP_APPLY_MAXIMAL_REQ",  true );
+        public static final AccordMessageType INTEROP_READ_REQ             = 
remote("INTEROP_READ_REQ");
+        public static final AccordMessageType INTEROP_READ_RSP             = 
remote("INTEROP_READ_RSP");
+        public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = 
remote("INTEROP_STABLE_THEN_READ_REQ");
+        public static final AccordMessageType INTEROP_READ_REPAIR_REQ      = 
remote("INTEROP_READ_REPAIR_REQ");
+        public static final AccordMessageType INTEROP_READ_REPAIR_RSP      = 
remote("INTEROP_READ_REPAIR_RSP");
+        public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ    = 
remote("INTEROP_APPLY_MINIMAL_REQ");
+        public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ    = 
remote("INTEROP_APPLY_MAXIMAL_REQ");
 
         public static final List<MessageType> values;
 
@@ -101,14 +101,14 @@ public class AccordMessageSink implements MessageSink
             values = builder.build();
         }
 
-        protected static AccordMessageType remote(String name, boolean 
hasSideEffects)
+        protected static AccordMessageType remote(String name)
         {
-            return new AccordMessageType(name, REMOTE, hasSideEffects);
+            return new AccordMessageType(name, REMOTE);
         }
 
-        private AccordMessageType(String name, MessageType.Kind kind, boolean 
hasSideEffects)
+        private AccordMessageType(String name, MessageType.Kind kind)
         {
-            super(name, kind, hasSideEffects);
+            super(name, kind);
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index c218ac65d8..03e168208c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -44,6 +44,7 @@ import accord.primitives.PartialDeps;
 import accord.primitives.PartialKeyRoute;
 import accord.primitives.PartialRangeRoute;
 import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
 import accord.primitives.Range;
 import accord.primitives.RangeDeps;
 import accord.primitives.Ranges;
@@ -290,8 +291,9 @@ public class AccordObjectSizes
         private static ICommand attrs(boolean hasDeps, boolean hasTxn, boolean 
executes)
         {
             FullKeyRoute route = new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{ 
EMPTY_KEY });
+            Participants<?> empty = route.slice(0, 0);
             ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID)
-                                       
.setParticipants(StoreParticipants.empty(EMPTY_TXNID, route, !executes))
+                                       
.setParticipants(StoreParticipants.create(route, empty, executes ? empty : 
null, empty, route))
                                        
.durability(Status.Durability.NotDurable)
                                        .executeAt(EMPTY_TXNID)
                                        .promised(Ballot.ZERO);
@@ -316,8 +318,8 @@ public class AccordObjectSizes
         final static long ACCEPTED = 
measure(Command.Accepted.accepted(attrs(true, false, false), 
SaveStatus.AcceptedMedium));
         final static long COMMITTED = 
measure(Command.Committed.committed(attrs(true, true, false), 
SaveStatus.Committed));
         final static long EXECUTED = 
measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
-        final static long TRUNCATED = 
measure(Command.Truncated.truncatedApply(attrs(false, false, false), 
SaveStatus.TruncatedApply,  EMPTY_TXNID, null, null));
-        final static long INVALIDATED = 
measure(Command.Truncated.invalidated(EMPTY_TXNID, 
StoreParticipants.empty(EMPTY_TXNID)));
+        final static long TRUNCATED = 
measure(Command.Truncated.truncated(attrs(false, false, false), 
SaveStatus.TruncatedApply,  EMPTY_TXNID, null, null));
+        final static long INVALIDATED = 
measure(Command.Truncated.invalidated(EMPTY_TXNID, attrs(false, false, 
false).participants()));
 
         private static long emptySize(Command command)
         {
@@ -353,7 +355,7 @@ public class AccordObjectSizes
                 case Applied:
                     return EXECUTED;
                 case TruncatedApply:
-                case TruncatedApplyWithDeps:
+                case TruncatedUnapplied:
                 case TruncatedApplyWithOutcome:
                 case Vestigial:
                 case Erased:
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index b4c3e636bf..a287a87d6f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -173,6 +173,7 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import static accord.messages.SimpleReply.Ok;
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Routable.Domain.Range;
+import static accord.primitives.TxnId.Cardinality.cardinality;
 import static accord.utils.Invariants.require;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -818,7 +819,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     @Override
     public @Nonnull AsyncTxnResult coordinateAsync(long minEpoch, @Nonnull Txn 
txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull 
Dispatcher.RequestTime requestTime)
     {
-        TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain());
+        TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain(), 
cardinality(txn.keys()));
         ClientRequestMetrics sharedMetrics;
         AccordClientRequestMetrics metrics;
         if (txn.isWrite())
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index 8ee5957209..466914eae6 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.service.accord.api;
 
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
+
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.messages.ReplyContext;
 import accord.primitives.Keys;
+import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.Seekables;
@@ -59,7 +62,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Txn.Kind.Write;
-import static accord.utils.Invariants.illegalState;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -259,9 +261,8 @@ public class AccordAgent implements Agent
     }
 
     @Override
-    public void onViolation(String message)
+    public void onViolation(String message, Participants<?> participants, 
@Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, 
@Nullable TxnId by, @Nullable Timestamp byEexecuteAt)
     {
-        try { throw illegalState(message); }
-        catch (Throwable t) { logger.error("Consistency violation", t); }
+        logger.error(message);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 227e5c74f3..d5a80d7b26 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -94,7 +94,12 @@ public class CommandSerializers
 
             long epoch = txnId.epoch();
             if((flags & HAS_EPOCH) != 0)
-                epoch += in.readUnsignedVInt();
+            {
+                long delta = in.readUnsignedVInt();
+                if (delta == 0)
+                    return Timestamp.NONE;
+                epoch += delta - 1;
+            }
 
             long hlc = txnId.hlc() + in.readUnsignedVInt();
             Node.Id node = new Node.Id(in.readUnsignedVInt32());
@@ -108,8 +113,8 @@ public class CommandSerializers
             int flags = in.readUnsignedVInt32();
             if ((flags & 1) != 0)
             {
-                if ((flags & HAS_EPOCH) != 0)
-                    in.readUnsignedVInt();
+                if ((flags & HAS_EPOCH) != 0 && in.readUnsignedVInt() == 0)
+                    return;
                 in.readUnsignedVInt();
                 in.readUnsignedVInt32();
                 if ((flags & HAS_UNIQUE_HLC) != 0)
@@ -124,7 +129,14 @@ public class CommandSerializers
             if ((flags & 1) != 0)
             {
                 if ((flags & HAS_EPOCH) != 0)
-                    out.writeUnsignedVInt(executeAt.epoch() - txnId.epoch());
+                {
+                    if (executeAt.equals(Timestamp.NONE))
+                    {
+                        out.writeUnsignedVInt(0L);
+                        return;
+                    }
+                    out.writeUnsignedVInt(1 + executeAt.epoch() - 
txnId.epoch());
+                }
                 out.writeUnsignedVInt(executeAt.hlc() - txnId.hlc());
                 out.writeUnsignedVInt32(executeAt.node.id);
                 if ((flags & HAS_UNIQUE_HLC) != 0)
@@ -152,7 +164,12 @@ public class CommandSerializers
             if ((flags & 1) != 0)
             {
                 if ((flags & HAS_EPOCH) != 0)
+                {
+                    if (executeAt.equals(Timestamp.NONE))
+                        return size + TypeSizes.sizeofUnsignedVInt(0L);
+
                     size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch() - 
txnId.epoch());
+                }
                 size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc() - 
txnId.hlc());
                 size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
                 if ((flags & HAS_UNIQUE_HLC) != 0)
@@ -176,10 +193,13 @@ public class CommandSerializers
             int flags = in.readUnsignedVInt32();
             if (nullable)
             {
-                if ((flags & 1) != 0) return null;
-                flags >>>= 1;
+                if (flags == 0) return null;
+                flags--;
             }
             long epoch = in.readUnsignedVInt();
+            if (epoch-- == 0)
+                return Timestamp.NONE;
+
             long hlc = in.readUnsignedVInt();
             Node.Id node = new Node.Id(in.readUnsignedVInt32());
             if ((flags & HAS_UNIQUE_HLC) == 0)
@@ -206,11 +226,12 @@ public class CommandSerializers
             int flags = in.readUnsignedVInt32();
             if (nullable)
             {
-                if ((flags & 1) != 0)
+                if (flags == 0)
                     return;
-                flags >>>= 1;
+                flags--;
             }
-            in.readUnsignedVInt();
+            if (0 == in.readUnsignedVInt())
+                return;
             in.readUnsignedVInt();
             in.readUnsignedVInt32();
             if ((flags & HAS_UNIQUE_HLC) != 0)
@@ -235,9 +256,13 @@ public class CommandSerializers
             {
                 Invariants.require(nullable);
             }
+            else if (executeAt.equals(Timestamp.NONE))
+            {
+                out.writeUnsignedVInt(0L);
+            }
             else
             {
-                out.writeUnsignedVInt(executeAt.epoch());
+                out.writeUnsignedVInt(1 + executeAt.epoch());
                 out.writeUnsignedVInt(executeAt.hlc());
                 out.writeUnsignedVInt32(executeAt.node.id);
                 if (executeAt.hasDistinctHlcAndUniqueHlc())
@@ -264,11 +289,15 @@ public class CommandSerializers
                 Invariants.require(nullable);
                 return size;
             }
-            size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch());
-            size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
-            size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
-            if (executeAt.hasDistinctHlcAndUniqueHlc())
-                size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - 
executeAt.hlc());
+            if (executeAt.equals(Timestamp.NONE)) size += 
TypeSizes.sizeofUnsignedVInt(0);
+            else
+            {
+                size += TypeSizes.sizeofUnsignedVInt(1 + executeAt.epoch());
+                size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
+                size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
+                if (executeAt.hasDistinctHlcAndUniqueHlc())
+                    size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() 
- executeAt.hlc());
+            }
             return size;
         }
 
@@ -277,7 +306,7 @@ public class CommandSerializers
             if (executeAt == null)
             {
                 Invariants.require(nullable);
-                return 1;
+                return 0;
             }
 
             int flags = executeAt.flags() << 2;
@@ -286,7 +315,7 @@ public class CommandSerializers
             if (executeAt.hasDistinctHlcAndUniqueHlc())
                 flags |= HAS_UNIQUE_HLC;
             if (nullable)
-                flags <<= 1;
+                flags++;
             return flags;
         }
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index 1a1a767176..9fa9333def 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -142,11 +142,11 @@ public class CommandStoreSerializers
             out.writeUnsignedVInt(t.startOwnershipEpoch);
             if (t.endOwnershipEpoch == Long.MAX_VALUE) 
out.writeUnsignedVInt(0L);
             else out.writeUnsignedVInt(1 + t.endOwnershipEpoch - 
t.startOwnershipEpoch);
-            
CommandSerializers.txnId.serialize(t.locallyWitnessedOrInvalidatedBefore, out, 
version);
-            
CommandSerializers.txnId.serialize(t.locallyAppliedOrInvalidatedBefore, out, 
version);
-            
CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedOrInvalidatedBefore,
 out, version);
-            
CommandSerializers.txnId.serialize(t.shardOnlyAppliedOrInvalidatedBefore, out, 
version);
-            
CommandSerializers.txnId.serialize(t.shardAppliedOrInvalidatedBefore, out, 
version);
+            CommandSerializers.txnId.serialize(t.locallyWitnessedBefore, out, 
version);
+            CommandSerializers.txnId.serialize(t.locallyAppliedBefore, out, 
version);
+            
CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedBefore, out, 
version);
+            CommandSerializers.txnId.serialize(t.shardOnlyAppliedBefore, out, 
version);
+            CommandSerializers.txnId.serialize(t.shardAppliedBefore, out, 
version);
             CommandSerializers.txnId.serialize(t.gcBefore, out, version);
             CommandSerializers.txnId.serialize(t.bootstrappedAt, out, version);
             
CommandSerializers.nullableTimestamp.serialize(t.staleUntilAtLeast, out, 
version);
@@ -177,11 +177,11 @@ public class CommandStoreSerializers
             long size = KeySerializers.range.serializedSize(t.range, version);
             size += TypeSizes.sizeofUnsignedVInt(t.startOwnershipEpoch);
             size += TypeSizes.sizeofUnsignedVInt(t.endOwnershipEpoch == 
Long.MAX_VALUE ? 0 : 1 + t.endOwnershipEpoch - t.startOwnershipEpoch);
-            size += 
CommandSerializers.txnId.serializedSize(t.locallyWitnessedOrInvalidatedBefore, 
version);
-            size += 
CommandSerializers.txnId.serializedSize(t.locallyAppliedOrInvalidatedBefore, 
version);
-            size += 
CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedOrInvalidatedBefore,
 version);
-            size += 
CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedOrInvalidatedBefore, 
version);
-            size += 
CommandSerializers.txnId.serializedSize(t.shardAppliedOrInvalidatedBefore, 
version);
+            size += 
CommandSerializers.txnId.serializedSize(t.locallyWitnessedBefore, version);
+            size += 
CommandSerializers.txnId.serializedSize(t.locallyAppliedBefore, version);
+            size += 
CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedBefore, 
version);
+            size += 
CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedBefore, version);
+            size += 
CommandSerializers.txnId.serializedSize(t.shardAppliedBefore, version);
             size += CommandSerializers.txnId.serializedSize(t.gcBefore, 
version);
             size += CommandSerializers.txnId.serializedSize(t.bootstrappedAt, 
version);
             size += 
CommandSerializers.nullableTimestamp.serializedSize(t.staleUntilAtLeast, 
version);
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 7698fe764e..877ba587c4 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -53,7 +53,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.awaitility.Awaitility;
 
-import static accord.primitives.TxnId.FastPath.UNOPTIMISED;
+import static accord.primitives.TxnId.FastPath.Unoptimised;
 import static org.apache.cassandra.Util.spinUntilSuccess;
 import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
 
@@ -136,7 +136,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
     public void blocked() throws ExecutionException, InterruptedException
     {
         ProtocolModifiers.Toggles.setPermitLocalExecution(false);
-        ProtocolModifiers.Toggles.setPermittedFastPaths(new 
TxnId.FastPaths(UNOPTIMISED));
+        ProtocolModifiers.Toggles.setPermittedFastPaths(new 
TxnId.FastPaths(Unoptimised));
         AccordMsgFilter filter = new AccordMsgFilter();
         MessagingService.instance().outboundSink.add(filter);
         try
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index ccabbcf265..ad9adfe33d 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -634,8 +634,8 @@ public class RouteIndexTest extends CQLTester.InMemory
         Int2ObjectHashMap<DurableBefore> durableBefores = new 
Int2ObjectHashMap<>();
         Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges = new 
Int2ObjectHashMap<>();
         RedundantBefore redundantBefore = Mockito.spy(RedundantBefore.EMPTY);
-        
Mockito.doReturn(RedundantStatus.LIVE).when(redundantBefore).status(Mockito.any(),
 Mockito.any(), (Participants<?>) Mockito.any());
-        
Mockito.doReturn(RedundantStatus.LIVE).when(redundantBefore).status(Mockito.any(),
 Mockito.any(), (RoutingKey) Mockito.any());
+        
Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(),
 Mockito.any(), (Participants<?>) Mockito.any());
+        
Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(),
 Mockito.any(), (RoutingKey) Mockito.any());
         for (int i = 0; i < MAX_STORES; i++)
         {
             redundantBefores.put(i, redundantBefore);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index 45ff263646..4e376e4939 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -30,6 +30,8 @@ import accord.api.Journal;
 import accord.local.Command;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
+import accord.primitives.Participants;
+import accord.primitives.RoutingKeys;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.TxnId;
@@ -39,13 +41,16 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.journal.TestParams;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.api.AccordAgent;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.utils.StorageCompatibilityMode;
 
@@ -82,7 +87,8 @@ public class AccordJournalOrderTest
             TxnId txnId = randomSource.nextBoolean() ? id1 : id2;
             JournalKey key = new JournalKey(txnId, 
JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5));
             res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1);
-            Command command = Command.NotDefined.notDefined(txnId, 
SaveStatus.NotDefined, Status.Durability.NotDurable, 
StoreParticipants.empty(txnId), Ballot.ZERO);
+            Participants<?> participants = RoutingKeys.of(new 
AccordRoutingKey.TokenKey(TableId.generate(), new 
Murmur3Partitioner.LongToken(1)));
+            Command command = Command.NotDefined.notDefined(txnId, 
SaveStatus.NotDefined, Status.Durability.NotDurable, 
StoreParticipants.create(null, participants, null, participants, participants), 
Ballot.ZERO);
             accordJournal.saveCommand(key.commandStoreId,
                                       new Journal.CommandUpdate(null, command),
                                       () -> {});
diff --git 
a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java 
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index b24a60a670..facb15357f 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -95,7 +95,6 @@ public class CommandChangeTest
                 SoftAssertions checks = new SoftAssertions();
                 for (SaveStatus saveStatus : SaveStatus.values())
                 {
-                    if (saveStatus == SaveStatus.TruncatedApplyWithDeps) 
continue;
                     out.clear();
                     Command orig = cmdBuilder.build(saveStatus);
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index ddd5146462..f8c85ad544 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -476,7 +476,7 @@ public class CommandsForKeySerializerTest
             }
 
             // TODO (expected): we currently don't explore TruncatedApply 
statuses because we don't transition through all phases and therefore don't 
adopt the Applied status
-            Choices<SaveStatus> saveStatusChoices = 
Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, 
SaveStatus.TruncatedApplyWithOutcome, 
SaveStatus.TruncatedApplyWithDeps)).toArray(SaveStatus[]::new));
+            Choices<SaveStatus> saveStatusChoices = 
Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, 
SaveStatus.TruncatedUnapplied, 
SaveStatus.TruncatedApplyWithOutcome)).toArray(SaveStatus[]::new));
             Supplier<SaveStatus> saveStatusSupplier = () -> {
                 SaveStatus result = saveStatusChoices.choose(source);
                 while (result.is(Status.Truncated)) // we don't currently 
process truncations
@@ -559,7 +559,7 @@ public class CommandsForKeySerializerTest
     public void test()
     {
         var tableGen = 
AccordGenerators.fromQT(CassandraGenerators.TABLE_ID_GEN);
-        var txnIdGen = AccordGens.txnIds(rs -> rs.nextLong(0, 100), rs -> 
rs.nextLong(100), rs -> rs.nextInt(10));
+        var txnIdGen = AccordGens.txnIds((Gen.LongGen) rs -> rs.nextLong(0, 
100), rs -> rs.nextLong(100), rs -> rs.nextInt(10));
         qt().check(rs -> {
             TableId table = tableGen.next(rs);
             TokenKey pk = new TokenKey(table, new 
Murmur3Partitioner.LongToken(rs.nextLong()));
@@ -647,8 +647,8 @@ public class CommandsForKeySerializerTest
         @Override public boolean inStore() { return true; }
         @Override public Journal.Loader loader() { throw new 
UnsupportedOperationException(); }
         @Override public Agent agent() { return this; }
-        @Override public AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
-        @Override public <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { throw new 
UnsupportedOperationException(); }
+        @Override public AsyncChain<Void> build(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
+        @Override public <T> AsyncChain<T> build(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { throw new 
UnsupportedOperationException(); }
         @Override public void shutdown() { }
         @Override protected void registerTransitive(SafeCommandStore 
safeStore, RangeDeps deps){ }
         @Override public <T> AsyncChain<T> submit(Callable<T> task) { throw 
new UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 05c731f7db..1b60d0a3b5 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
 
 import accord.local.Command;
+import accord.local.Command.Truncated;
 import accord.local.ICommand;
 import accord.local.DurableBefore;
 import accord.local.RedundantBefore;
@@ -245,8 +246,6 @@ public class AccordGenerators
             switch (saveStatus)
             {
                 default: throw new AssertionError("Unhandled saveStatus: " + 
saveStatus);
-                case TruncatedApplyWithDeps:
-                    throw new IllegalArgumentException("TruncatedApplyWithDeps 
is not a valid state for a Command to be in, its for FetchData");
                 case Uninitialised:
                 case NotDefined:
                     return 
Command.NotDefined.notDefined(attributes(saveStatus), Ballot.ZERO);
@@ -285,17 +284,18 @@ public class AccordGenerators
                     return Command.Executed.executed(attributes(saveStatus), 
saveStatus);
 
                 case TruncatedApply:
-                    if (txnId.kind().awaitsOnlyDeps()) return 
Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, 
null, null, txnId);
-                    else return 
Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, 
null, null);
+                case TruncatedUnapplied:
+                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, null, null, 
txnId);
+                    else return Truncated.truncated(attributes(saveStatus), 
saveStatus, executeAt, null, null);
 
                 case TruncatedApplyWithOutcome:
-                    if (txnId.kind().awaitsOnlyDeps()) return 
Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new 
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
-                    else return 
Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new 
TxnWrite(Collections.emptyList(), true)) : null, new TxnData());
+                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new 
TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId);
+                    else return Truncated.truncated(attributes(saveStatus), 
saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, 
keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, new 
TxnData());
 
                 case Erased:
                 case Vestigial:
                 case Invalidated:
-                    return Command.Truncated.invalidated(txnId, 
attributes(saveStatus).participants());
+                    return Truncated.invalidated(txnId, 
attributes(saveStatus).participants());
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to