This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 4c8c0c8b3e Accord: Fix unit tests and improve burn test stability
4c8c0c8b3e is described below

commit 4c8c0c8b3e008c04d5c5f9eef1e8e0b321596c5f
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 19 20:04:11 2024 +0000

    Accord: Fix unit tests and improve burn test stability
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20113
---
 modules/accord                                     |   2 +-
 .../service/accord/AccordCommandStore.java         |  22 ++-
 .../cassandra/service/accord/AccordExecutor.java   |   3 +-
 .../service/accord/AccordSafeCommandStore.java     | 162 ++-------------------
 .../cassandra/service/accord/AccordService.java    |   6 +-
 .../service/accord/CommandsForRanges.java          |   7 +-
 .../accord/interop/AccordInteropAdapter.java       |  13 +-
 .../service/accord/interop/AccordInteropApply.java |  10 +-
 .../accord/interop/AccordInteropPersist.java       |   5 +-
 .../accord/serializers/AwaitSerializer.java        |  11 +-
 .../accord/serializers/ReadDataSerializers.java    |  14 +-
 .../compaction/CompactionAccordIteratorsTest.java  |  15 +-
 .../cassandra/service/accord/AccordTaskTest.java   |   2 +-
 .../serializers/CommandsForKeySerializerTest.java  |  13 +-
 .../apache/cassandra/utils/AccordGenerators.java   |  16 +-
 15 files changed, 98 insertions(+), 203 deletions(-)

diff --git a/modules/accord b/modules/accord
index a271897790..61f4ebd21f 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a271897790aa3816c3dea2125b1e374b091bc090
+Subproject commit 61f4ebd21ff34b0807081d9df442236d8c334b52
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d3c59fbd7f..f0b4eb12b5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -38,6 +38,7 @@ import accord.api.DataStore;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.impl.AbstractSafeCommandStore.CommandStoreCaches;
 import accord.impl.TimestampsForKey;
 import accord.local.Cleanup;
 import accord.local.Command;
@@ -119,7 +120,7 @@ public class AccordCommandStore extends CommandStore
         }
     }
 
-    public static final class ExclusiveCaches extends Caches implements 
AutoCloseable
+    public static final class ExclusiveCaches extends Caches implements 
CommandStoreCaches<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey>
     {
         private final Lock lock;
 
@@ -129,6 +130,25 @@ public class AccordCommandStore extends CommandStore
             this.lock = lock;
         }
 
+
+        @Override
+        public AccordSafeCommand acquireIfLoaded(TxnId txnId)
+        {
+            return commands().acquireIfLoaded(txnId);
+        }
+
+        @Override
+        public AccordSafeCommandsForKey acquireIfLoaded(RoutingKey key)
+        {
+            return commandsForKeys().acquireIfLoaded(key);
+        }
+
+        @Override
+        public AccordSafeTimestampsForKey acquireTfkIfLoaded(RoutingKey key)
+        {
+            return timestampsForKeys().acquireIfLoaded(key);
+        }
+
         @Override
         public void close()
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index ce778dd76a..cc6e96364d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -561,7 +561,8 @@ public abstract class AccordExecutor implements CacheSize, 
AccordCacheEntry.OnLo
     private <K, V> void onLoadedExclusive(AccordCacheEntry<K, V> loaded, V 
value, Throwable fail, boolean isForRange)
     {
         --activeLoads;
-        --activeRangeLoads;
+        if (isForRange)
+            --activeRangeLoads;
 
         if (loaded.status() != EVICTED)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index a1fe99b47c..75afc32625 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -18,11 +18,7 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.AbstractCollection;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
@@ -36,23 +32,16 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.impl.AbstractSafeCommandStore;
 import accord.impl.CommandsSummary;
-import accord.impl.SafeTimestampsForKey;
 import accord.local.CommandStores;
 import accord.local.CommandStores.RangesForEpoch;
-import accord.local.KeyHistory;
 import accord.local.NodeCommandStoreService;
-import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
-import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey;
 import accord.primitives.AbstractKeys;
-import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.Participants;
 import accord.primitives.Ranges;
-import accord.primitives.Routable;
 import accord.primitives.Routables;
-import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -60,14 +49,11 @@ import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
 
-import static accord.local.KeyHistory.TIMESTAMPS;
-import static accord.utils.Invariants.illegalArgument;
 import static accord.utils.Invariants.illegalState;
 
-public class AccordSafeCommandStore extends SafeCommandStore
+public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey, AccordCommandStore.ExclusiveCaches>
 {
     final AccordTask<?> task;
-    final PreLoadContext context;
     private final @Nullable CommandsForRanges commandsForRanges;
     private final AccordCommandStore commandStore;
     private RangesForEpoch ranges;
@@ -77,7 +63,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
                                    @Nullable CommandsForRanges 
commandsForRanges,
                                    AccordCommandStore commandStore)
     {
-        this.context = task.preLoadContext();
+        super(task.preLoadContext());
         this.task = task;
         this.commandsForRanges = commandsForRanges;
         this.commandStore = commandStore;
@@ -93,91 +79,6 @@ public class AccordSafeCommandStore extends SafeCommandStore
         return new AccordSafeCommandStore(operation, commandsForRanges, 
commandStore);
     }
 
-    @Override
-    public PreLoadContext canExecute(PreLoadContext with)
-    {
-        if (with.isEmpty()) return with;
-        if (with.keys().domain() == Routable.Domain.Range)
-            return with.isSubsetOf(this.context) ? with : null;
-
-        if (!context().keyHistory().satisfies(with.keyHistory()))
-            return null;
-
-        try (ExclusiveCaches caches = commandStore.tryLockCaches())
-        {
-            if (caches == null)
-                return with.isSubsetOf(this.context) ? with : null;
-
-            for (TxnId txnId : with.txnIds())
-            {
-                if (null != getInternal(txnId))
-                    continue;
-
-                AccordSafeCommand safeCommand = 
caches.commands().acquireIfLoaded(txnId);
-                if (safeCommand == null)
-                    return null;
-
-                add(safeCommand, caches);
-            }
-
-            KeyHistory keyHistory = with.keyHistory();
-            if (keyHistory == KeyHistory.NONE)
-                return with;
-
-            List<RoutingKey> unavailable = null;
-            AbstractUnseekableKeys keys = (AbstractUnseekableKeys) 
context.keys();
-            if (keys.size() == 0)
-                return with;
-
-            for (int i = 0 ; i < keys.size() ; ++i)
-            {
-                RoutingKey key = keys.get(i);
-                if (keyHistory == TIMESTAMPS)
-                {
-                    if (null != timestampsForKeyInternal(key))
-                        continue; // already in working set
-
-                    AccordSafeTimestampsForKey safeTfk = 
caches.timestampsForKeys().acquireIfLoaded(key);
-                    if (safeTfk != null)
-                    {
-                        add(safeTfk, caches);
-                        continue;
-                    }
-                }
-                else
-                {
-                    if (null != getInternal(key))
-                        continue; // already in working set
-
-                    AccordSafeCommandsForKey safeCfk = 
caches.commandsForKeys().acquireIfLoaded(key);
-                    if (safeCfk != null)
-                    {
-                        add(safeCfk, caches);
-                        continue;
-                    }
-                }
-                if (unavailable == null)
-                    unavailable = new ArrayList<>();
-                unavailable.add(key);
-            }
-
-            if (unavailable == null)
-                return with;
-
-            if (unavailable.size() == keys.size())
-                return null;
-
-            Participants<RoutingKey> available = 
keys.without(RoutingKeys.ofSortedUnique(unavailable));
-            return PreLoadContext.contextFor(context.primaryTxnId(), 
context.additionalTxnId(), available, keyHistory);
-        }
-    }
-
-    @Override
-    public PreLoadContext context()
-    {
-        return context;
-    }
-
     @VisibleForTesting
     public Set<RoutingKey> commandsForKeysKeys()
     {
@@ -196,22 +97,12 @@ public class AccordSafeCommandStore extends 
SafeCommandStore
     }
 
     @Override
-    protected AccordSafeCommand 
ifLoadedAndInitialisedAndNotErasedInternal(TxnId txnId)
+    protected ExclusiveCaches tryGetCaches()
     {
-        try (ExclusiveCaches caches = commandStore.tryLockCaches())
-        {
-            if (caches == null)
-                return null;
-
-            AccordSafeCommand command = 
caches.commands().acquireIfLoaded(txnId);
-            if (command == null)
-                return null;
-
-            return add(command, caches);
-        }
+        return commandStore.tryLockCaches();
     }
 
-    private AccordSafeCommand add(AccordSafeCommand safeCommand, 
ExclusiveCaches caches)
+    protected AccordSafeCommand add(AccordSafeCommand safeCommand, 
ExclusiveCaches caches)
     {
         Object check = task.ensureCommands().putIfAbsent(safeCommand.txnId(), 
safeCommand);
         if (check == null)
@@ -226,7 +117,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
         }
     }
 
-    private AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk, 
ExclusiveCaches caches)
+    protected AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk, 
ExclusiveCaches caches)
     {
         Object check = task.ensureCommandsForKey().putIfAbsent(safeCfk.key(), 
safeCfk);
         if (check == null)
@@ -241,7 +132,7 @@ public class AccordSafeCommandStore extends SafeCommandStore
         }
     }
 
-    private AccordSafeTimestampsForKey add(AccordSafeTimestampsForKey safeTfk, 
ExclusiveCaches caches)
+    protected AccordSafeTimestampsForKey add(AccordSafeTimestampsForKey 
safeTfk, ExclusiveCaches caches)
     {
         Object check = 
task.ensureTimestampsForKey().putIfAbsent(safeTfk.key(), safeTfk);
         if (check == null)
@@ -265,42 +156,7 @@ public class AccordSafeCommandStore extends 
SafeCommandStore
         return commandsForKey.get(key);
     }
 
-    @Override
-    protected AccordSafeCommandsForKey ifLoadedInternal(RoutingKey key)
-    {
-        try (ExclusiveCaches caches = commandStore.tryLockCaches())
-        {
-            if (caches == null)
-                return null;
-
-            AccordSafeCommandsForKey safeCfk = 
caches.commandsForKeys().acquireIfLoaded(key);
-            if (safeCfk == null)
-                return null;
-
-            Object check = 
task.ensureCommandsForKey().putIfAbsent(safeCfk.key(), safeCfk);
-            if (check == null)
-            {
-                safeCfk.preExecute();
-                return safeCfk;
-            }
-            else
-            {
-                caches.commandsForKeys().release(safeCfk, task);
-                throw illegalState("Attempted to take a duplicate reference to 
CFK for %s", key);
-            }
-        }
-    }
-
-    @Override
-    public SafeTimestampsForKey timestampsForKey(RoutingKey key)
-    {
-        AccordSafeTimestampsForKey safeTfk = timestampsForKeyInternal(key);
-        if (safeTfk == null)
-            throw illegalArgument("%s not referenced in %s", key, context);
-        return safeTfk;
-    }
-
-    private AccordSafeTimestampsForKey timestampsForKeyInternal(RoutingKey key)
+    protected AccordSafeTimestampsForKey timestampsForKeyInternal(RoutingKey 
key)
     {
         Map<RoutingKey, AccordSafeTimestampsForKey> timestampsForKey = 
task.timestampsForKey();
         if (timestampsForKey == null)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index b3bd311c92..ae9534ca3b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -1037,7 +1037,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         {
             if (state.knows(blockedBy)) continue;
             // need to fetch the state
-            if (safeStore.ifLoadedAndInitialisedAndNotErased(blockedBy) != 
null)
+            if (safeStore.ifLoadedAndInitialised(blockedBy) != null)
             {
                 AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
                 if (chain != null)
@@ -1077,7 +1077,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
         state.keys.put(pk, blocking);
         if (state.txns.containsKey(blocking)) return null;
-        if (safeStore.ifLoadedAndInitialisedAndNotErased(blocking) != null) 
return populate(state, safeStore, blocking);
+        if (safeStore.ifLoadedAndInitialised(blocking) != null) return 
populate(state, safeStore, blocking);
         return populate(state, safeStore.commandStore(), blocking);
     }
 
@@ -1437,7 +1437,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             {
                 if (tracker.recordSuccess(from) == RequestStatus.Success)
                 {
-                    
node.configService().reportEpochRedundant(exclusiveSyncPoint.route.toRanges(), 
exclusiveSyncPoint.syncId.epoch());
+                    
node.configService().reportEpochRedundant(exclusiveSyncPoint.route.toRanges(), 
exclusiveSyncPoint.syncId.epoch() - 1);
                     trySuccess(exclusiveSyncPoint);
                 }
             }
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index 66f7daccab..13b21b5da8 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -57,7 +57,7 @@ import org.apache.cassandra.index.accord.RoutesSearcher;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey;
 
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
-import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED;
 import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
 import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
 import static accord.primitives.Routables.Slice.Minimal;
@@ -129,6 +129,9 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
CommandsForRanges.Summ
                 case IS_STABLE:
                     if (!summary.saveStatus.hasBeen(Stable) || 
summary.saveStatus.hasBeen(Truncated))
                         return;
+                case IS_STABLE_OR_INVALIDATED:
+                    if (!summary.saveStatus.hasBeen(Stable) || 
summary.saveStatus.status == Truncated)
+                        return;
             }
 
             if (testDep != ANY_DEPS)
@@ -152,7 +155,7 @@ public class CommandsForRanges extends TreeMap<Timestamp, 
CommandsForRanges.Summ
                 // and so it is safe to execute, when in fact it is only a 
dependency on a different shard
                 // (and that other shard, perhaps, does not know that it is a 
dependency - and so it is not durably known)
                 // TODO (required): consider this some more
-                if ((testDep == WITH) == !summary.hasAsDep)
+                if ((testDep == WITH_OR_INVALIDATED) == (!summary.hasAsDep || 
summary.saveStatus == SaveStatus.Invalidated))
                     return;
 
                 Invariants.checkState(testTxnId.equals(summary.findAsDep));
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
index e4464c44f4..820582d930 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropAdapter.java
@@ -33,7 +33,7 @@ import accord.local.Node;
 import accord.messages.Apply;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
-import accord.primitives.Participants;
+import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -89,14 +89,13 @@ public class AccordInteropAdapter extends AbstractTxnAdapter
     }
 
     @Override
-    public void persist(Node node, Topologies all, FullRoute<?> route, 
Participants<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result, BiConsumer<? super Result, Throwable> callback)
+    public void persist(Node node, Topologies all, FullRoute<?> fullRoute, 
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, BiConsumer<? super Result, Throwable> callback)
     {
-        // TODO (required): we aren't using sendTo
-        if (applyKind == Minimal && doInteropPersist(node, all, route, txnId, 
txn, executeAt, deps, writes, result, callback))
+        if (applyKind == Minimal && doInteropPersist(node, all, sendTo, txnId, 
txn, executeAt, deps, writes, result, fullRoute, callback))
             return;
 
         if (callback != null) callback.accept(result, null);
-        new PersistTxn(node, all, txnId, route, txn, executeAt, deps, writes, 
result)
+        new PersistTxn(node, all, txnId, sendTo, txn, executeAt, deps, writes, 
result, fullRoute)
             .start(Apply.FACTORY, applyKind, all, writes, result);
     }
 
@@ -113,14 +112,14 @@ public class AccordInteropAdapter extends 
AbstractTxnAdapter
         return true;
     }
 
-    private static boolean doInteropPersist(Node node, Topologies all, 
FullRoute<?> route, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result, BiConsumer<? super Result, Throwable> callback)
+    private static boolean doInteropPersist(Node node, Topologies all, 
Route<?> sendTo, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute, BiConsumer<? super Result, 
Throwable> callback)
     {
         Update update = txn.update();
         ConsistencyLevel consistencyLevel = update instanceof AccordUpdate ? 
((AccordUpdate) update).cassandraCommitCL() : null;
         if (consistencyLevel == null || consistencyLevel == 
ConsistencyLevel.ANY || writes.isEmpty())
             return false;
 
-        new AccordInteropPersist(node, all, txnId, route, txn, executeAt, 
deps, writes, result, consistencyLevel, callback)
+        new AccordInteropPersist(node, all, txnId, sendTo, txn, executeAt, 
deps, writes, result, fullRoute, consistencyLevel, callback)
             .start(AccordInteropApply.FACTORY, Minimal, all, writes, result);
         return true;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 1cd3a05695..44f54ab51d 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -62,14 +62,14 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
     public static final Apply.Factory FACTORY = new Apply.Factory()
     {
         @Override
-        public Apply create(Kind kind, Id to, Topologies participates, TxnId 
txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result)
+        public Apply create(Kind kind, Id to, Topologies participates, TxnId 
txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result, FullRoute<?> fullRoute)
         {
             checkArgument(kind != Kind.Maximal, "Shouldn't need to send a 
maximal commit with interop support");
             ConsistencyLevel commitCL = txn.update() instanceof AccordUpdate ? 
((AccordUpdate) txn.update()).cassandraCommitCL() : null;
             // Any asynchronous apply option should use the regular Apply that 
doesn't wait for writes to complete
             if (commitCL == null || commitCL == ConsistencyLevel.ANY)
-                return Apply.FACTORY.create(kind, to, participates, txnId, 
route, txn, executeAt, deps, writes, result);
-            return new AccordInteropApply(kind, to, participates, txnId, 
route, txn, executeAt, deps, writes, result);
+                return Apply.FACTORY.create(kind, to, participates, txnId, 
route, txn, executeAt, deps, writes, result, fullRoute);
+            return new AccordInteropApply(kind, to, participates, txnId, 
route, txn, executeAt, deps, writes, result, fullRoute);
         }
     };
 
@@ -91,9 +91,9 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
         super(kind, txnId, route, waitForEpoch, executeAt, deps, txn, 
fullRoute, writes, result);
     }
 
-    private AccordInteropApply(Kind kind, Id to, Topologies participates, 
TxnId txnId, FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, 
Writes writes, Result result)
+    private AccordInteropApply(Kind kind, Id to, Topologies participates, 
TxnId txnId, Route<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes 
writes, Result result, FullRoute<?> fullRoute)
     {
-        super(kind, to, participates, txnId, route, txn, executeAt, deps, 
writes, result);
+        super(kind, to, participates, txnId, route, txn, executeAt, deps, 
writes, result, fullRoute);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
index a0fd42229f..342dc40140 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropPersist.java
@@ -30,6 +30,7 @@ import accord.local.Node;
 import accord.messages.Apply;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
+import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
@@ -91,9 +92,9 @@ public class AccordInteropPersist extends Persist
     private final ConsistencyLevel consistencyLevel;
     private CallbackHolder callback;
 
-    public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId, 
FullRoute<?> route, Txn txn, Timestamp executeAt, Deps deps, Writes writes, 
Result result, ConsistencyLevel consistencyLevel, BiConsumer<? super Result, 
Throwable> clientCallback)
+    public AccordInteropPersist(Node node, Topologies topologies, TxnId txnId, 
Route<?> sendTo, Txn txn, Timestamp executeAt, Deps deps, Writes writes, Result 
result, FullRoute<?> fullRoute, ConsistencyLevel consistencyLevel, BiConsumer<? 
super Result, Throwable> clientCallback)
     {
-        super(node, topologies, txnId, route, txn, executeAt, deps, writes, 
result);
+        super(node, topologies, txnId, sendTo, txn, executeAt, deps, writes, 
result, fullRoute);
         Invariants.checkArgument(consistencyLevel == ConsistencyLevel.QUORUM 
|| consistencyLevel == ConsistencyLevel.ALL || consistencyLevel == 
ConsistencyLevel.SERIAL || consistencyLevel == ConsistencyLevel.ONE);
         this.consistencyLevel = consistencyLevel;
         registerClientCallback(result, clientCallback);
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
index 54b25a1c87..0006a068ec 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
@@ -45,7 +45,8 @@ public class AwaitSerializer
             CommandSerializers.txnId.serialize(await.txnId, out, version);
             KeySerializers.participants.serialize(await.scope, out, version);
             out.writeByte(await.blockedUntil.ordinal());
-            out.writeUnsignedVInt(await.awaitEpoch - await.txnId.epoch());
+            out.writeUnsignedVInt(await.maxAwaitEpoch - await.txnId.epoch());
+            out.writeUnsignedVInt(await.maxAwaitEpoch - await.minAwaitEpoch);
             out.writeUnsignedVInt32(await.callbackId + 1);
             Invariants.checkState(await.callbackId >= -1);
         }
@@ -56,10 +57,11 @@ public class AwaitSerializer
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
             Participants<?> scope = 
KeySerializers.participants.deserialize(in, version);
             BlockedUntil blockedUntil = BlockedUntil.forOrdinal(in.readByte());
-            long awaitEpoch = in.readUnsignedVInt() + txnId.epoch();
+            long maxAwaitEpoch = in.readUnsignedVInt() + txnId.epoch();
+            long minAwaitEpoch = maxAwaitEpoch - in.readUnsignedVInt();
             int callbackId = in.readUnsignedVInt32() - 1;
             Invariants.checkState(callbackId >= -1);
-            return Await.SerializerSupport.create(txnId, scope, blockedUntil, 
awaitEpoch, callbackId);
+            return Await.SerializerSupport.create(txnId, scope, blockedUntil, 
minAwaitEpoch, maxAwaitEpoch, callbackId);
         }
 
         @Override
@@ -68,7 +70,8 @@ public class AwaitSerializer
             return CommandSerializers.txnId.serializedSize(await.txnId, 
version)
                    + KeySerializers.participants.serializedSize(await.scope, 
version)
                    + TypeSizes.BYTE_SIZE
-                   + VIntCoding.computeUnsignedVIntSize(await.awaitEpoch - 
await.txnId.epoch())
+                   + VIntCoding.computeUnsignedVIntSize(await.maxAwaitEpoch - 
await.txnId.epoch())
+                   + VIntCoding.computeUnsignedVIntSize(await.maxAwaitEpoch - 
await.minAwaitEpoch)
                    + VIntCoding.computeUnsignedVIntSize(await.callbackId + 1);
         }
     };
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 0c86ee7747..947b166619 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -80,6 +80,7 @@ public class ReadDataSerializers
         {
             CommandSerializers.txnId.serialize(msg.txnId, out, version);
             KeySerializers.participants.serialize(msg.readScope, out, version);
+            out.writeUnsignedVInt(msg.minEpoch());
             CommandSerializers.timestamp.serialize(msg.executeAt, out, 
version);
             KeySerializers.fullRoute.serialize(msg.route, out, version);
             CommandSerializers.partialTxn.serialize(msg.txn, out, version);
@@ -93,6 +94,7 @@ public class ReadDataSerializers
             return ApplyThenWaitUntilApplied.SerializerSupport.create(
             CommandSerializers.txnId.deserialize(in, version),
             KeySerializers.participants.deserialize(in, version),
+            in.readUnsignedVInt(),
             CommandSerializers.timestamp.deserialize(in, version),
             KeySerializers.fullRoute.deserialize(in, version),
             CommandSerializers.partialTxn.deserialize(in, version),
@@ -106,6 +108,7 @@ public class ReadDataSerializers
         {
             return CommandSerializers.txnId.serializedSize(msg.txnId, version)
                    + KeySerializers.participants.serializedSize(msg.readScope, 
version)
+                   + TypeSizes.sizeofUnsignedVInt(msg.minEpoch())
                    + 
CommandSerializers.timestamp.serializedSize(msg.executeAt, version)
                    + KeySerializers.fullRoute.serializedSize(msg.route, 
version)
                    + CommandSerializers.partialTxn.serializedSize(msg.txn, 
version)
@@ -279,7 +282,8 @@ public class ReadDataSerializers
         {
             CommandSerializers.txnId.serialize(waitUntilApplied.txnId, out, 
version);
             KeySerializers.participants.serialize(waitUntilApplied.readScope, 
out, version);
-            out.writeUnsignedVInt(waitUntilApplied.executeAtEpoch);
+            out.writeUnsignedVInt(waitUntilApplied.minEpoch());
+            out.writeUnsignedVInt(waitUntilApplied.executeAtEpoch - 
waitUntilApplied.minEpoch());
         }
 
         @Override
@@ -287,8 +291,9 @@ public class ReadDataSerializers
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
             Participants<?> readScope = 
KeySerializers.participants.deserialize(in, version);
-            long executeAtEpoch = in.readUnsignedVInt();
-            return WaitUntilApplied.SerializerSupport.create(txnId, readScope, 
executeAtEpoch);
+            long minEpoch = in.readUnsignedVInt();
+            long executeAtEpoch = minEpoch + in.readUnsignedVInt();
+            return WaitUntilApplied.SerializerSupport.create(txnId, readScope, 
minEpoch, executeAtEpoch);
         }
 
         @Override
@@ -296,7 +301,8 @@ public class ReadDataSerializers
         {
             return 
CommandSerializers.txnId.serializedSize(waitUntilApplied.txnId, version)
                    + 
KeySerializers.participants.serializedSize(waitUntilApplied.readScope, version)
-                   + 
TypeSizes.sizeofUnsignedVInt(waitUntilApplied.executeAtEpoch);
+                   + TypeSizes.sizeofUnsignedVInt(waitUntilApplied.minEpoch())
+                   + 
TypeSizes.sizeofUnsignedVInt(waitUntilApplied.executeAtEpoch - 
waitUntilApplied.minEpoch());
         }
     };
 }
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 6bf6849e42..fbcfee12f3 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -207,8 +207,8 @@ public class CompactionAccordIteratorsTest
         testAccordCommandsPurger(redundantBefore(GT_TXN_ID), 
durableBefore(UNIVERSAL), expectAccordCommandsErase());
         // Universally durable but not locally; we're stale, but shouldn't 
erase
         testAccordCommandsPurger(redundantBefore(LT_TXN_ID), 
durableBefore(UNIVERSAL), expectAccordCommandsNoChange());
-        // With redundantBefore at the txnId there should be no change because 
it is < not <=
-        testAccordCommandsPurger(redundantBefore(TXN_ID), 
durableBefore(MAJORITY), expectAccordCommandsNoChange());
+        // redundantBefore will be > txnId as must be exclusive sync point, so 
will truncate
+        testAccordCommandsPurger(redundantBefore(TXN_ID), 
durableBefore(MAJORITY), expectAccordCommandsTruncated());
         testAccordCommandsPurger(redundantBefore(LT_TXN_ID), 
durableBefore(MAJORITY), expectAccordCommandsNoChange());
         // Durable at a majority can be truncated with minimal data preserved, 
it must be redundant for this to occur
         testAccordCommandsPurger(redundantBefore(GT_TXN_ID), 
durableBefore(MAJORITY), expectAccordCommandsTruncated());
@@ -249,10 +249,11 @@ public class CompactionAccordIteratorsTest
         testAccordCommandsForKeyPurger(null, 
expectedAccordCommandsForKeyNoChange());
         testAccordTimestampsForKeyPurger(redundantBefore(LT_TXN_ID), 
expectedAccordTimestampsForKeyNoChange());
         testAccordCommandsForKeyPurger(redundantBefore(LT_TXN_ID), 
expectedAccordCommandsForKeyNoChange());
-        testAccordTimestampsForKeyPurger(redundantBefore(TXN_ID), 
expectedAccordTimestampsForKeyNoChange());
-        testAccordCommandsForKeyPurger(redundantBefore(TXN_ID), 
expectedAccordCommandsForKeyNoChange());
-        testAccordTimestampsForKeyPurger(redundantBefore(GT_TXN_ID), 
expectedAccordTimestampsForKeyEraseOne());
-        testAccordCommandsForKeyPurger(redundantBefore(GT_TXN_ID), 
expectedAccordCommandsForKeyEraseOne());
+        // will erase one more than expected as converted to 
ExclusiveSyncPoint id which is > base id
+        testAccordTimestampsForKeyPurger(redundantBefore(TXN_ID), 
expectedAccordTimestampsForKeyEraseOne());
+        testAccordCommandsForKeyPurger(redundantBefore(TXN_ID), 
expectedAccordCommandsForKeyEraseOne());
+        testAccordTimestampsForKeyPurger(redundantBefore(GT_TXN_ID), 
expectedAccordTimestampsForKeyEraseAll());
+        testAccordCommandsForKeyPurger(redundantBefore(GT_TXN_ID), 
expectedAccordCommandsForKeyEraseAll());
         testAccordTimestampsForKeyPurger(redundantBefore(GT_SECOND_TXN_ID), 
expectedAccordTimestampsForKeyEraseAll());
         testAccordCommandsForKeyPurger(redundantBefore(GT_SECOND_TXN_ID), 
expectedAccordCommandsForKeyEraseAll());
     }
@@ -399,7 +400,7 @@ public class CompactionAccordIteratorsTest
     private static RedundantBefore redundantBefore(TxnId txnId)
     {
         Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table, 
42));
-        txnId = txnId.as(Kind.Read, Range);
+        txnId = txnId.as(Kind.ExclusiveSyncPoint, Range);
         return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE, 
txnId, txnId, txnId, txnId, LT_TXN_ID.as(Range));
     }
 
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index bc476baab5..0207b14d20 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -133,7 +133,7 @@ public class AccordTaskTest
             // TODO review: This change to `ifInitialized` was done in a lot 
of places and it doesn't preserve this property
             // I fixed this reference to point to `ifLoadedAndInitialised` and 
but didn't update other places
             Assert.assertNull(instance.ifInitialised(txnId));
-            
Assert.assertNull(instance.ifLoadedAndInitialisedAndNotErased(txnId));
+            Assert.assertNull(instance.ifLoadedAndInitialised(txnId));
         }));
 
         UntypedResultSet result = AccordKeyspace.loadCommandRow(commandStore, 
txnId);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 052736200c..e863661982 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
@@ -202,6 +203,8 @@ public class CommandsForKeySerializerTest
 
                 case Erased:
                 case ErasedOrVestigial:
+                    return Command.Truncated.erased(txnId, 
Status.Durability.UniversalOrInvalidated, StoreParticipants.empty(txnId));
+
                 case Invalidated:
                     return Command.SerializerSupport.invalidated(txnId);
             }
@@ -431,7 +434,8 @@ public class CommandsForKeySerializerTest
                 }
             }
 
-            Choices<SaveStatus> saveStatusChoices = 
Choices.uniform(SaveStatus.values());
+            // TODO (expected): we currently don't explore TruncatedApply 
statuses because we don't transition through all phases and therefore don't 
adopt the Applied status
+            Choices<SaveStatus> saveStatusChoices = 
Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, 
SaveStatus.TruncatedApplyWithOutcome, 
SaveStatus.TruncatedApplyWithDeps)).toArray(SaveStatus[]::new));
             Supplier<SaveStatus> saveStatusSupplier = () -> {
                 SaveStatus result = saveStatusChoices.choose(source);
                 while (result == SaveStatus.TruncatedApplyWithDeps) // not a 
real save status
@@ -471,7 +475,8 @@ public class CommandsForKeySerializerTest
             while (commands.size() > 0)
             {
                 int next = source.nextInt(commands.size());
-                cfk = cfk.update(commands.get(next)).cfk();
+                Command command = commands.get(next);
+                cfk = cfk.update(command).cfk();
                 commands.set(next, commands.get(commands.size() - 1));
                 commands.remove(commands.size() - 1);
             }
@@ -486,7 +491,7 @@ public class CommandsForKeySerializerTest
                 }
                 TxnInfo info = cfk.get(i);
                 InternalStatus expectStatus = 
InternalStatus.from(cmd.saveStatus);
-                if (expectStatus == null) expectStatus = 
InternalStatus.TRANSITIVELY_KNOWN;
+                if (expectStatus == null) expectStatus = 
InternalStatus.TRANSITIVE;
                 if (expectStatus.hasExecuteAtOrDeps)
                     Assert.assertEquals(cmd.executeAt, info.executeAt);
                 Assert.assertEquals(expectStatus, info.status());
@@ -545,7 +550,7 @@ public class CommandsForKeySerializerTest
                 {
                     int idx = Arrays.binarySearch(ids, u.txnId);
                     if (idx < 0)
-                        missing.add(TxnInfo.create(u.txnId, 
InternalStatus.TRANSITIVELY_KNOWN, true, u.txnId, Ballot.ZERO));
+                        missing.add(TxnInfo.create(u.txnId, 
InternalStatus.TRANSITIVE, true, u.txnId, Ballot.ZERO));
                 }
                 if (!missing.isEmpty())
                 {
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 99097c064c..e4e08ac180 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -443,7 +443,7 @@ public class AccordGenerators
 
     public static Gen<RedundantBefore.Entry> redundantBeforeEntry(IPartitioner 
partitioner)
     {
-        return redundantBeforeEntry(Gens.bools().all(), range(partitioner), 
AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint, Txn.Kind.ExclusiveSyncPoint), 
ignore -> Routable.Domain.Range));
+        return redundantBeforeEntry(Gens.bools().all(), range(partitioner), 
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore -> 
Routable.Domain.Range));
     }
 
     public static Gen<RedundantBefore.Entry> redundantBeforeEntry(Gen<Boolean> 
emptyGen, Gen<Range> rangeGen, Gen<TxnId> txnIdGen)
@@ -451,11 +451,11 @@ public class AccordGenerators
         return rs -> {
             Range range = rangeGen.next(rs);
             TxnId locallyWitnessedOrInvalidatedBefore = emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
-            TxnId locallyAppliedOrInvalidatedBefore = emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
-            TxnId locallyDecidedAndAppliedOrInvalidatedBefore = 
locallyAppliedOrInvalidatedBefore;
-            TxnId shardAppliedOrInvalidatedBefore = emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
-            TxnId shardOnlyAppliedOrInvalidatedBefore = 
shardAppliedOrInvalidatedBefore;
-            TxnId gcBefore = shardAppliedOrInvalidatedBefore;
+            TxnId locallyAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMin(locallyWitnessedOrInvalidatedBefore, emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
+            TxnId locallyDecidedAndAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
+            TxnId shardOnlyAppliedOrInvalidatedBefore = emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
+            TxnId shardAppliedOrInvalidatedBefore = 
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, 
TxnId.nonNullOrMin(shardOnlyAppliedOrInvalidatedBefore, emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs))); // emptyable or range
+            TxnId gcBefore = 
TxnId.nonNullOrMin(shardAppliedOrInvalidatedBefore, emptyGen.next(rs) ? 
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
             TxnId bootstrappedAt = txnIdGen.next(rs);
             Timestamp staleUntilAtLeast = emptyGen.next(rs) ? null : 
txnIdGen.next(rs); // nullable
 
@@ -469,7 +469,7 @@ public class AccordGenerators
     public static Gen<RedundantBefore> redundantBefore(IPartitioner 
partitioner)
     {
         Gen<Ranges> rangeGen = rangesArbitrary(partitioner);
-        Gen<TxnId> txnIdGen = AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint, 
Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range);
+        Gen<TxnId> txnIdGen = 
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore -> 
Routable.Domain.Range);
         BiFunction<RandomSource, Range, RedundantBefore.Entry> entryGen = (rs, 
range) -> redundantBeforeEntry(Gens.bools().all(), i -> range, 
txnIdGen).next(rs);
         return AccordGens.redundantBefore(rangeGen, entryGen);
     }
@@ -477,7 +477,7 @@ public class AccordGenerators
     public static Gen<DurableBefore> durableBeforeGen(IPartitioner partitioner)
     {
         Gen<Ranges> rangeGen = rangesArbitrary(partitioner);
-        Gen<TxnId> txnIdGen = AccordGens.txnIds(Gens.pick(Txn.Kind.SyncPoint, 
Txn.Kind.ExclusiveSyncPoint), ignore -> Routable.Domain.Range);
+        Gen<TxnId> txnIdGen = 
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore -> 
Routable.Domain.Range);
 
         return (rs) -> {
             Ranges ranges = rangeGen.next(rs);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to