This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3d2b0efc34 Accord: Add Rebootstrap and unsafe Bootstrap To support 
recovering a node that has lost some of its local transaction log, introduce 
rebootstrap and unsafe bootstrap modes, where Accord ensures no responses are 
produced for transactions the node cannot be certain it had not previously 
answered.
3d2b0efc34 is described below

commit 3d2b0efc34d56402757357dcfb6fe175c104944a
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Jun 26 17:55:11 2025 +0200

    Accord: Add Rebootstrap and unsafe Bootstrap
    To support recovering a node that has lost some of its local transaction 
log, introduce rebootstrap and unsafe bootstrap modes, where Accord ensures no 
responses are produced for transactions the node cannot be certain it had not 
previously answered.
    
    patch by Benedict and Alex Petrov for CASSANDRA-20908
---
 modules/accord                                     |   2 +-
 .../cassandra/db/virtual/AccordDebugKeyspace.java  |  26 +--
 .../cassandra/metrics/AccordSystemMetrics.java     |   2 +-
 .../cassandra/service/ActiveRepairService.java     |   3 +-
 src/java/org/apache/cassandra/service/Rebuild.java |   3 +-
 .../apache/cassandra/service/StorageService.java   |  32 +++-
 .../service/accord/AccordCommandStores.java        |  25 ---
 .../service/accord/AccordConfigurationService.java |   4 +-
 .../cassandra/service/accord/AccordDataStore.java  |  28 ++-
 .../cassandra/service/accord/AccordJournal.java    |   3 +-
 .../cassandra/service/accord/AccordService.java    |  34 ++--
 .../cassandra/service/accord/AccordTopology.java   |   3 +-
 .../cassandra/service/accord/IAccordService.java   |  19 +-
 .../cassandra/service/accord/api/AccordAgent.java  |   9 +-
 .../service/accord/api/AccordViolationHandler.java |  10 +-
 .../interop/AccordInteropStableThenRead.java       |   4 +-
 .../accord/serializers/AcceptSerializers.java      |   4 +-
 .../accord/serializers/AwaitSerializers.java       |   4 +-
 .../serializers/BeginInvalidationSerializers.java  |   4 +-
 .../accord/serializers/CheckStatusSerializers.java |   8 +-
 .../serializers/CommandStoreSerializers.java       |  13 +-
 .../accord/serializers/CommitSerializers.java      |   8 +-
 .../accord/serializers/RecoverySerializers.java    |   2 +-
 .../accord/serializers/TxnRequestSerializer.java   |   6 +-
 .../cassandra/tcm/sequences/BootstrapAndJoin.java  |   3 +-
 .../cassandra/tcm/sequences/DropAccordTable.java   |   3 +-
 .../org/apache/cassandra/tcm/sequences/Move.java   |   3 +-
 .../cassandra/distributed/shared/ClusterUtils.java |   3 +-
 .../cassandra/distributed/test/TestBaseImpl.java   |   9 +-
 .../test/accord/AccordBootstrapTest.java           | 195 ++++++++-------------
 .../test/accord/AccordIncrementalRepairTest.java   |  12 +-
 .../test/accord/AccordIntegrationTest.java         |   2 +-
 .../distributed/test/accord/AccordTestBase.java    |   2 +-
 .../cassandra/fuzz/topology/AccordBounceTest.java  |   8 +-
 .../service/accord/AccordJournalBurnTest.java      |   6 +-
 .../db/virtual/AccordDebugKeyspaceTest.java        |   9 +-
 .../cassandra/index/accord/RouteIndexTest.java     |   6 +-
 .../service/accord/AccordCommandTest.java          |   2 +-
 .../service/accord/AccordSyncPropagatorTest.java   |   6 +-
 .../accord/SimulatedAccordCommandStore.java        |  10 +-
 .../service/accord/SimulatedAccordTaskTest.java    |   4 +-
 .../serializers/CommandsForKeySerializerTest.java  |   6 +-
 .../apache/cassandra/utils/AccordGenerators.java   |  10 +-
 43 files changed, 262 insertions(+), 293 deletions(-)

diff --git a/modules/accord b/modules/accord
index 520818a004..657f344eb3 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 520818a004a89217cf86efa6c8fa2968401968ec
+Subproject commit 657f344eb3b3570966bf8cff7731bef6eeea98f1
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index 87029aa94f..87c69688eb 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -47,6 +47,7 @@ import accord.coordinate.Coordination;
 import accord.coordinate.Coordinations;
 import accord.coordinate.PrepareRecovery;
 import accord.coordinate.tracking.AbstractTracker;
+import accord.primitives.RoutingKeys;
 import accord.utils.SortedListMap;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.EmptyIterators;
@@ -74,8 +75,6 @@ import accord.local.CommandStores;
 import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.Commands;
 import accord.local.DurableBefore;
-import accord.local.LoadKeys;
-import accord.local.LoadKeysFor;
 import accord.local.MaxConflicts;
 import accord.local.Node;
 import accord.local.PreLoadContext;
@@ -143,8 +142,9 @@ import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STOR
 import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
 import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
 import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
+import static accord.local.RedundantStatus.Property.LOG_UNAVAILABLE;
 import static accord.local.RedundantStatus.Property.QUORUM_APPLIED;
-import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.Property.UNREADY;
 import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
 import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.lang.String.format;
@@ -365,9 +365,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             TokenKey key = TokenKey.parse(keyStr, 
DatabaseDescriptor.getPartitioner());
 
             List<Entry> cfks = new CopyOnWriteArrayList<>();
-            PreLoadContext context = PreLoadContext.contextFor(key, 
LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key table query");
             CommandStores commandStores = 
AccordService.instance().node().commandStores();
-            AccordService.getBlocking(commandStores.forEach(context, key, 
Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            AccordService.getBlocking(commandStores.forEach("commands_for_key 
table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> 
{
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
@@ -475,9 +474,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             TokenKey key = TokenKey.parse(keyStr, 
DatabaseDescriptor.getPartitioner());
 
             List<Entry> cfks = new CopyOnWriteArrayList<>();
-            PreLoadContext context = PreLoadContext.contextFor(key, 
LoadKeys.SYNC, LoadKeysFor.READ_WRITE, "commands_for_key_unmanaged table 
query");
             CommandStores commandStores = 
AccordService.instance().node().commandStores();
-            AccordService.getBlocking(commandStores.forEach(context, key, 
Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
+            
AccordService.getBlocking(commandStores.forEach("commands_for_key_unmanaged 
table query", RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> 
{
                 SafeCommandsForKey safeCfk = safeStore.get(key);
                 CommandsForKey cfk = safeCfk.current();
                 if (cfk == null)
@@ -888,8 +886,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                         "  locally_redundant 'TxnIdUtf8Type',\n" +
                         "  locally_synced 'TxnIdUtf8Type',\n" +
                         "  locally_witnessed 'TxnIdUtf8Type',\n" +
-                        "  pre_bootstrap 'TxnIdUtf8Type',\n" +
-                        "  stale_until_at_least 'TxnIdUtf8Type',\n" +
+                        "  log_unavailable 'TxnIdUtf8Type',\n" +
+                        "  unready 'TxnIdUtf8Type',\n" +
+                        "  stale_until 'TxnIdUtf8Type',\n" +
                         "  PRIMARY KEY (keyspace_name, table_name, table_id, 
command_store_id, token_start)" +
                         ')', UTF8Type.instance));
         }
@@ -923,8 +922,9 @@ public class AccordDebugKeyspace extends VirtualKeyspace
                           .column("locally_redundant", 
entry.maxBound(LOCALLY_REDUNDANT).toString())
                           .column("locally_synced", 
entry.maxBound(LOCALLY_SYNCED).toString())
                           .column("locally_witnessed", 
entry.maxBound(LOCALLY_WITNESSED).toString())
-                          .column("pre_bootstrap", 
entry.maxBound(PRE_BOOTSTRAP).toString())
-                          .column("stale_until_at_least", 
entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
+                          .column("log_unavailable", 
entry.maxBound(LOG_UNAVAILABLE).toString())
+                          .column("unready", 
entry.maxBound(UNREADY).toString())
+                          .column("stale_until", entry.staleUntilAtLeast != 
null ? entry.staleUntilAtLeast.toString() : null);
                         return ds;
                     },
                     dataSet,
@@ -1188,7 +1188,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             TxnId txnId = TxnId.parse(txnIdStr);
 
             List<Entry> commands = new CopyOnWriteArrayList<>();
-            
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
+            AccordService.instance().node().commandStores().forAllUnsafe(store 
-> {
                 Command command = 
((AccordCommandStore)store).loadCommand(txnId);
                 if (command != null)
                     commands.add(new Entry(store.id(), command));
@@ -1293,7 +1293,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             TxnId txnId = TxnId.parse(txnIdStr);
 
             List<Entry> entries = new ArrayList<>();
-            
AccordService.instance().node().commandStores().forEachCommandStore(store -> {
+            AccordService.instance().node().commandStores().forAllUnsafe(store 
-> {
                 for (AccordJournal.DebugEntry e : 
((AccordCommandStore)store).debugCommand(txnId))
                     entries.add(new Entry(store.id(), e.segment, e.position, 
e.builder));
             });
diff --git a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
index a47fff2302..c9f8130225 100644
--- a/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordSystemMetrics.java
@@ -175,7 +175,7 @@ public class AccordSystemMetrics
 
         int nowSeconds = (int) (Clock.Global.currentTimeMillis() / 1000);
         SnapshotBuilder builder = new SnapshotBuilder();
-        service.node().commandStores().forEachCommandStore(commandStore -> {
+        service.node().commandStores().forAllUnsafe(commandStore -> {
             DefaultProgressLog.ImmutableView view = 
((DefaultProgressLog)commandStore.unsafeProgressLog()).immutableView();
             builder.progressLogActive += view.activeCount();
             builder.progressLogSize.increment(view.size());
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index c297ab1912..27e50157a3 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -582,8 +582,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
             {
                 throw new IllegalArgumentException(String.format("Requested 
range %s intersects a local range (%s) " +
                                                                  "but is not 
fully contained in one; this would lead to " +
-                                                                 "imprecise 
repair. keyspace: %s", toRepair.toString(),
-                                                                 
range.toString(), keyspaceName));
+                                                                 "imprecise 
repair. keyspace: %s", toRepair, range, keyspaceName));
             }
         }
         if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
diff --git a/src/java/org/apache/cassandra/service/Rebuild.java 
b/src/java/org/apache/cassandra/service/Rebuild.java
index c7d40f08bf..56ad1f47ba 100644
--- a/src/java/org/apache/cassandra/service/Rebuild.java
+++ b/src/java/org/apache/cassandra/service/Rebuild.java
@@ -33,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
@@ -160,7 +161,7 @@ public class Rebuild
 
             StreamResultFuture streamResult = streamer.fetchAsync();
 
-            Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata);
+            Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
             Future<?> ready = FutureCombiner.allOf(streamResult, accordReady);
 
             // wait for result
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 8d861618a5..84a61f6b26 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -71,9 +71,6 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.repair.autorepair.AutoRepair;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,6 +131,7 @@ import org.apache.cassandra.index.IndexStatusManager;
 import org.apache.cassandra.io.sstable.IScrubber;
 import org.apache.cassandra.io.sstable.IVerifier;
 import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileUtils;
@@ -158,7 +156,9 @@ import org.apache.cassandra.metrics.SamplingManager;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairCoordinator;
+import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.SharedContext;
+import org.apache.cassandra.repair.autorepair.AutoRepair;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -183,6 +183,7 @@ import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupLocalCoordinator;
 import org.apache.cassandra.service.paxos.cleanup.PaxosRepairState;
 import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
@@ -206,9 +207,9 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.Assassinate;
 import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
-import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.Unregister;
@@ -3145,6 +3146,29 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return new FutureTask<>(task);
     }
 
+    public RepairCoordinator repairAccordKeyspace(String keyspace, 
Collection<Range<Token>> ranges)
+    {
+        int cmd = nextRepairCommand.incrementAndGet();
+        RepairOption options = new RepairOption(RepairParallelism.PARALLEL, // 
parallelism
+                                                false,                       
// primaryRange
+                                                false,                      // 
incremental
+                                                false,                      // 
trace
+                                                5,                          // 
jobThreads
+                                                ranges,                     // 
ranges
+                                                true,                       // 
pullRepair
+                                                true,                       // 
forceRepair
+                                                PreviewKind.NONE,           // 
previewKind
+                                                false,                      // 
optimiseStreams
+                                                true,                       // 
ignoreUnreplicatedKeyspaces
+                                                true,                       // 
repairData
+                                                false,                      // 
repairPaxos
+                                                true,                       // 
dontPurgeTombstones
+                                                false                       // 
repairAccord
+        );
+
+        return new RepairCoordinator(this, cmd, options, keyspace);
+    }
+
     private void tryRepairPaxosForTopologyChange(String reason)
     {
         try
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index cbf1fe49dc..95f0377f25 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -27,19 +27,14 @@ import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.local.CommandStores;
-import accord.local.Node;
 import accord.local.NodeCommandStoreService;
 import accord.local.SequentialAsyncExecutor;
 import accord.local.ShardDistributor;
-import accord.primitives.Range;
-import accord.topology.Topology;
 import accord.utils.RandomSource;
 import org.apache.cassandra.cache.CacheSize;
 import org.apache.cassandra.config.AccordSpec.QueueShardModel;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.TableId;
 import 
org.apache.cassandra.service.accord.AccordExecutor.AccordExecutorFactory;
-import org.apache.cassandra.service.accord.api.TokenKey;
 
 import static 
org.apache.cassandra.config.AccordSpec.QueueShardModel.THREAD_PER_SHARD;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordQueueShardCount;
@@ -112,15 +107,6 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
         };
     }
 
-    @Override
-    protected boolean shouldBootstrap(Node node, Topology previous, Topology 
updated, Range range)
-    {
-        if (!super.shouldBootstrap(node, previous, updated, range))
-            return false;
-        // we see new ranges when a new keyspace is added, so avoid bootstrap 
in these cases
-        return contains(previous, ((TokenKey)  range.start()).table());
-    }
-
     @Override
     public SequentialAsyncExecutor someSequentialExecutor()
     {
@@ -128,17 +114,6 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
         return executors[idx].newSequentialExecutor();
     }
 
-    private static boolean contains(Topology previous, TableId searchTable)
-    {
-        for (Range range : previous.ranges())
-        {
-            TableId table = ((TokenKey)  range.start()).table();
-            if (table.equals(searchTable))
-                return true;
-        }
-        return false;
-    }
-
     public synchronized void setCapacity(long bytes)
     {
         cacheSize = bytes;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 5dc0d19d09..80e3296f56 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -113,7 +113,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
 
         @Nullable AsyncResult<Void> reads()
         {
-            return reads;
+            return ready == null ? null : ready.reads;
         }
 
         AsyncResult.Settable<Void> localSyncNotified()
@@ -449,7 +449,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
     }
 
     @Override
-    protected void localSyncComplete(Topology topology, boolean startSync)
+    protected void onReadyToCoordinate(Topology topology, boolean startSync)
     {
         long epoch = topology.epoch();
         EpochState epochState = getOrCreateEpochState(epoch);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
index 0234184a38..7934aebf76 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
@@ -30,6 +30,7 @@ import accord.local.cfk.CommandsForKey;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.SyncPoint;
+import accord.utils.UnhandledEnum;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.memtable.Memtable;
 import org.apache.cassandra.schema.Schema;
@@ -40,14 +41,6 @@ public class AccordDataStore implements DataStore
     private static final Logger logger = 
LoggerFactory.getLogger(AccordDataStore.class);
     enum FlushListenerKey { KEY }
 
-    @Override
-    public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges 
ranges, SyncPoint syncPoint, FetchRanges callback)
-    {
-        AccordFetchCoordinator coordinator = new AccordFetchCoordinator(node, 
ranges, syncPoint, callback, safeStore.commandStore());
-        coordinator.start();
-        return coordinator.result();
-    }
-
     /**
      * Ensures data for the intersecting ranges is flushed to sstable before 
calling back with reportOnSuccess.
      * This is used to gate journal cleanup, since we skip the CommitLog for 
applying to the data table.
@@ -95,4 +88,23 @@ public class AccordDataStore implements DataStore
             prev = cfs;
         }
     }
+
+    @Override
+    public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges 
ranges, SyncPoint syncPoint, FetchRanges callback, FetchKind kind)
+    {
+        switch (kind)
+        {
+            default: throw new UnhandledEnum(kind);
+            case Image:
+            {
+                AccordFetchCoordinator coordinator = new 
AccordFetchCoordinator(node, ranges, syncPoint, callback, 
safeStore.commandStore());
+                coordinator.start();
+                return coordinator.result();
+            }
+            case Sync:
+            {
+                throw new UnsupportedOperationException();
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index 045acea8df..cfa224ab49 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -585,7 +585,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
 
     @SuppressWarnings("unchecked")
     @Override
-    public void replay(CommandStores commandStores)
+    public boolean replay(CommandStores commandStores)
     {
         // TODO (expected): make the parallelisms configurable
         // Replay is performed in parallel, where at most X commands can be in 
flight, accross at most Y commands stores.
@@ -716,6 +716,7 @@ public class AccordJournal implements accord.api.Journal, 
RangeSearcher.Supplier
 
                 ++cur;
             }
+            return true;
         }
         catch (Throwable t)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 058b9171ac..34b7dd28c4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -39,6 +39,7 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
 import org.apache.cassandra.service.accord.api.AccordViolationHandler;
 import org.apache.cassandra.utils.Clock;
@@ -327,7 +328,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     }
 
     @VisibleForTesting
-    public static void replayJournal(AccordService as)
+    public static boolean replayJournal(AccordService as)
     {
         logger.info("Starting journal replay.");
         long before = Clock.Global.nanoTime();
@@ -337,12 +338,12 @@ public class AccordService implements IAccordService, 
Shutdownable
             if (as.journalConfiguration().replayMode() == RESET)
                 AccordKeyspace.truncateCommandsForKey();
 
-            as.node.commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().stop());
+            as.node.commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().stop());
             as.journal().replay(as.node().commandStores());
             logger.info("Waiting for command stores to quiesce.");
             ((AccordCommandStores)as.node.commandStores()).waitForQuiescense();
             as.journal.unsafeSetStarted();
-            as.node.commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().start());
+            as.node.commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().start());
         }
         finally
         {
@@ -351,14 +352,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
         long after = Clock.Global.nanoTime();
         logger.info("Finished journal replay. {}ms elapsed", 
NANOSECONDS.toMillis(after - before));
-    }
-
-    public static void shutdownServiceAndWait(long timeout, TimeUnit unit) 
throws InterruptedException, TimeoutException
-    {
-        IAccordService i = instance;
-        if (i == null)
-            return;
-        i.shutdownAndWait(timeout, unit);
+        return true;
     }
 
     @Override
@@ -565,7 +559,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         if (keys.size() != 1)
             return syncInternal(minBound, keys, syncLocal, syncRemote);
 
-        return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), 
syncLocal, syncRemote)
+        return KeyBarriers.find(node, minBound, keys.get(0).toUnseekable(), 
syncLocal, syncRemote).chain()
                           .flatMap(found -> KeyBarriers.await(node, 
node.someSequentialExecutor(), found, syncLocal, syncRemote))
                           .flatMap(success -> {
                               if (success)
@@ -799,8 +793,8 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
         Ready ready = new Ready();
         AccordCommandStores commandStores = (AccordCommandStores) 
node.commandStores();
-        getBlocking(commandStores.forEach((PreLoadContext.Empty)() -> "Flush 
Caches", safeStore -> {
-            AccordCommandStore commandStore = 
(AccordCommandStore)safeStore.commandStore();
+        commandStores.forAllUnsafe(unsafeStore -> {
+            AccordCommandStore commandStore = (AccordCommandStore)unsafeStore;
             try (AccordCommandStore.ExclusiveCaches caches = 
commandStore.lockCaches())
             {
                 caches.commandsForKeys().forEach(entry -> {
@@ -811,7 +805,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                     }
                 });
             }
-        }));
+        });
         ready.decrement();
         AsyncPromise<Void> result = new AsyncPromise<>();
         ready.invoke((success, fail) -> {
@@ -1037,18 +1031,18 @@ public class AccordService implements IAccordService, 
Shutdownable
     }
 
     @Override
-    public Future<Void> epochReady(Epoch epoch)
+    public Future<Void> epochReady(Epoch epoch, Function<EpochReady, 
AsyncResult<Void>> get)
     {
-        return toFuture(configService.epochReady(epoch.getEpoch()));
+        return toFuture(configService.epochReady(epoch.getEpoch(), get));
     }
 
     @Override
-    public Future<Void> epochReadyFor(ClusterMetadata metadata)
+    public Future<Void> epochReadyFor(ClusterMetadata metadata, 
Function<EpochReady, AsyncResult<Void>> get)
     {
         if (!metadata.schema.hasAccordKeyspaces())
             return EPOCH_READY;
 
-        return epochReady(metadata.epoch);
+        return epochReady(metadata.epoch, get);
     }
 
     @Override
@@ -1116,7 +1110,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     public AccordCompactionInfos getCompactionInfo()
     {
         AccordCompactionInfos compactionInfos = new 
AccordCompactionInfos(node.durableBefore(), node.topology().minEpoch());
-        node.commandStores().forEachCommandStore(commandStore -> {
+        node.commandStores().forAllUnsafe(commandStore -> {
             compactionInfos.put(commandStore.id(), 
((AccordCommandStore)commandStore).getCompactionInfo());
         });
         return compactionInfos;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java 
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index ca817e647d..8c60ecae5e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import accord.api.ConfigurationService.EpochReady;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.Ranges;
@@ -399,7 +400,7 @@ public class AccordTopology
         {
             ClusterMetadataService.instance().fetchLogFromCMS(epoch);
             IAccordService service = AccordService.instance();
-            
service.epochReady(epoch).get(service.agent().expireEpochWait(MILLISECONDS), 
MILLISECONDS);
+            service.epochReady(epoch, 
EpochReady::reads).get(service.agent().expireEpochWait(MILLISECONDS), 
MILLISECONDS);
         }
         catch (InterruptedException e)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 34453d22c3..422155449b 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -25,9 +25,11 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.api.ConfigurationService.EpochReady;
 import accord.utils.async.AsyncResult;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.slf4j.Logger;
@@ -127,9 +129,8 @@ public interface IAccordService
      * Return a future that will complete once the accord has completed it's 
local bootstrap process
      * for any ranges gained in the given epoch
      */
-    Future<Void> epochReady(Epoch epoch);
-
-    Future<Void> epochReadyFor(ClusterMetadata epoch);
+    Future<Void> epochReady(Epoch epoch, Function<EpochReady, 
AsyncResult<Void>> f);
+    Future<Void> epochReadyFor(ClusterMetadata epoch, Function<EpochReady, 
AsyncResult<Void>> f);
 
     void receive(Message<AccordSyncPropagator.Notification> message);
 
@@ -308,13 +309,13 @@ public interface IAccordService
         }
 
         @Override
-        public Future<Void> epochReady(Epoch epoch)
+        public Future<Void> epochReady(Epoch epoch, Function<EpochReady, 
AsyncResult<Void>> get)
         {
             return BOOTSTRAP_SUCCESS;
         }
 
         @Override
-        public Future<Void> epochReadyFor(ClusterMetadata epoch)
+        public Future<Void> epochReadyFor(ClusterMetadata epoch, 
Function<EpochReady, AsyncResult<Void>> get)
         {
             return BOOTSTRAP_SUCCESS;
         }
@@ -515,15 +516,15 @@ public interface IAccordService
         }
 
         @Override
-        public Future<Void> epochReady(Epoch epoch)
+        public Future<Void> epochReady(Epoch epoch, Function<EpochReady, 
AsyncResult<Void>> get)
         {
-            return delegate.epochReady(epoch);
+            return delegate.epochReady(epoch, get);
         }
 
         @Override
-        public Future<Void> epochReadyFor(ClusterMetadata epoch)
+        public Future<Void> epochReadyFor(ClusterMetadata epoch, 
Function<EpochReady, AsyncResult<Void>> get)
         {
-            return delegate.epochReadyFor(epoch);
+            return delegate.epochReadyFor(epoch, get);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index be7153fc2c..e09a565c41 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.CoordinatorEventListener;
+import accord.api.OwnershipEventListener;
 import accord.api.ReplicaEventListener;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.api.RoutingKey;
@@ -93,7 +94,7 @@ import static 
org.apache.cassandra.service.accord.api.AccordWaitStrategies.slowR
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 // TODO (expected): merge with AccordService
-public class AccordAgent implements Agent
+public class AccordAgent implements Agent, OwnershipEventListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordAgent.class);
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1L, MINUTES);
@@ -125,6 +126,12 @@ public class AccordAgent implements Agent
         return tracing.trace(txnId, eventType);
     }
 
+    @Override
+    public OwnershipEventListener ownershipEvents()
+    {
+        return this;
+    }
+
     public void setNodeId(Node.Id id)
     {
         self = id;
diff --git 
a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
index 67ec0111a8..c402789329 100644
--- 
a/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
+++ 
b/src/java/org/apache/cassandra/service/accord/api/AccordViolationHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service.accord.api;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,8 +31,6 @@ import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 
-import static accord.utils.Invariants.illegalState;
-
 public class AccordViolationHandler implements ViolationHandler
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordViolationHandler.class);
@@ -40,13 +40,11 @@ public class AccordViolationHandler implements 
ViolationHandler
         ViolationHandlerHolder.set(AccordViolationHandler::new);
     }
 
-    @Override
-    public void onTimestampViolation(SafeCommandStore safeStore, Command 
command, Participants<?> otherParticipants, Route<?> otherRoute, Timestamp 
otherExecuteAt)
+    public void onTimestampViolation(@Nullable SafeCommandStore safeStore, 
Command command, Participants<?> otherParticipants, @Nullable Route<?> 
otherRoute, Timestamp otherExecuteAt)
     {
-        throw 
illegalState(ViolationHandler.timestampViolationMessage(safeStore, command, 
otherParticipants, otherRoute, otherExecuteAt));
+        logger.error(ViolationHandler.timestampViolationMessage(safeStore, 
command, otherParticipants, otherRoute, otherExecuteAt));
     }
 
-    @Override
     public void onDependencyViolation(Participants<?> participants, TxnId 
notWitnessed, Timestamp notWitnessedExecuteAt, TxnId by, Timestamp byExecuteAt)
     {
         logger.error(ViolationHandler.dependencyViolationMessage(participants, 
notWitnessed, notWitnessedExecuteAt, by, byExecuteAt));
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
index 5dcbb4552f..468f5ff413 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropStableThenRead.java
@@ -144,13 +144,13 @@ public class AccordInteropStableThenRead extends 
AccordInteropRead
     }
 
     @Override
-    public CommitOrReadNack apply(SafeCommandStore safeStore)
+    public CommitOrReadNack applyInternal(SafeCommandStore safeStore)
     {
         Route<?> route = this.route == null ? (Route)scope : this.route;
         StoreParticipants participants = StoreParticipants.execute(safeStore, 
route, txnId, minEpoch(), executeAtEpoch);
         SafeCommand safeCommand = safeStore.get(txnId, participants);
         Commands.commit(safeStore, safeCommand, participants, kind.saveStatus, 
Ballot.ZERO, txnId, route, partialTxn, executeAt, partialDeps, kind);
-        return super.apply(safeStore, safeCommand, participants);
+        return super.applyInternal(safeStore, safeCommand, participants);
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index cd96796142..53391cd322 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -88,7 +88,7 @@ public class AcceptSerializers
             CommandSerializers.status.serialize(invalidate.status, out);
             CommandSerializers.ballot.serialize(invalidate.ballot, out);
             CommandSerializers.txnId.serialize(invalidate.txnId, out);
-            KeySerializers.participants.serialize(invalidate.participants, 
out);
+            KeySerializers.participants.serialize(invalidate.scope, out);
         }
 
         @Override
@@ -106,7 +106,7 @@ public class AcceptSerializers
             return CommandSerializers.status.serializedSize(invalidate.status)
                    + 
CommandSerializers.ballot.serializedSize(invalidate.ballot)
                    + CommandSerializers.txnId.serializedSize(invalidate.txnId)
-                   + 
KeySerializers.participants.serializedSize(invalidate.participants);
+                   + 
KeySerializers.participants.serializedSize(invalidate.scope);
         }
     };
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
index af67c05b04..0e017184fb 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
@@ -122,7 +122,7 @@ public class AwaitSerializers
         public void serialize(AsyncAwaitComplete ok, DataOutputPlus out) 
throws IOException
         {
             CommandSerializers.txnId.serialize(ok.txnId, out);
-            KeySerializers.route.serialize(ok.route, out);
+            KeySerializers.route.serialize(ok.scope, out);
             out.writeByte(ok.newStatus.ordinal());
             out.writeUnsignedVInt32(ok.callbackId);
         }
@@ -141,7 +141,7 @@ public class AwaitSerializers
         public long serializedSize(AsyncAwaitComplete ok)
         {
             return CommandSerializers.txnId.serializedSize(ok.txnId)
-                   + KeySerializers.route.serializedSize(ok.route)
+                   + KeySerializers.route.serializedSize(ok.scope)
                    + TypeSizes.BYTE_SIZE
                    + VIntCoding.computeVIntSize(ok.callbackId);
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
index 25a40a6a41..231c932878 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/BeginInvalidationSerializers.java
@@ -39,7 +39,7 @@ public class BeginInvalidationSerializers
         public void serialize(BeginInvalidation begin, DataOutputPlus out) 
throws IOException
         {
             CommandSerializers.txnId.serialize(begin.txnId, out);
-            KeySerializers.participants.serialize(begin.participants, out);
+            KeySerializers.participants.serialize(begin.scope, out);
             CommandSerializers.ballot.serialize(begin.ballot, out);
         }
 
@@ -55,7 +55,7 @@ public class BeginInvalidationSerializers
         public long serializedSize(BeginInvalidation begin)
         {
             return CommandSerializers.txnId.serializedSize(begin.txnId)
-                   + 
KeySerializers.participants.serializedSize(begin.participants)
+                   + KeySerializers.participants.serializedSize(begin.scope)
                    + CommandSerializers.ballot.serializedSize(begin.ballot);
         }
     };
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
index d1f81512b9..9747d9f72f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CheckStatusSerializers.java
@@ -128,8 +128,8 @@ public class CheckStatusSerializers
         public void serialize(CheckStatus check, DataOutputPlus out) throws 
IOException
         {
             CommandSerializers.txnId.serialize(check.txnId, out);
-            KeySerializers.participants.serialize(check.query, out);
-            out.writeUnsignedVInt(check.sourceEpoch);
+            KeySerializers.participants.serialize(check.scope, out);
+            out.writeUnsignedVInt(check.waitForEpoch);
             out.writeByte(check.includeInfo.ordinal());
             CommandSerializers.ballot.serialize(check.bumpBallot, out);
         }
@@ -149,8 +149,8 @@ public class CheckStatusSerializers
         public long serializedSize(CheckStatus check)
         {
             return CommandSerializers.txnId.serializedSize(check.txnId)
-                   + KeySerializers.participants.serializedSize(check.query)
-                   + TypeSizes.sizeofUnsignedVInt(check.sourceEpoch)
+                   + KeySerializers.participants.serializedSize(check.scope)
+                   + TypeSizes.sizeofUnsignedVInt(check.waitForEpoch)
                    + TypeSizes.BYTE_SIZE
                    + 
CommandSerializers.ballot.serializedSize(check.bumpBallot);
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index a1cb244b13..a30ea59270 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -155,11 +155,18 @@ public class CommandStoreSerializers
             }
             for (int i = 0 ; i < b.bounds.length ; ++i)
             {
-                out.writeShort(b.status(i * 2));
-                out.writeShort(b.status(i * 2 + 1));
+                out.writeShort(cast(b.status(i * 2)));
+                out.writeShort(cast(b.status(i * 2 + 1)));
             }
         }
 
+        private short cast(long v)
+        {
+            if ((v & ~0xFFFF) != 0)
+                throw new IllegalStateException("Cannot serialize 
RedundantStatus larger than 0xFFFF. Requires serialization version bump.");
+            return (short)v;
+        }
+
         @Override
         public RedundantBefore.Bounds deserialize(DataInputPlus in) throws 
IOException
         {
@@ -174,7 +181,7 @@ public class CommandStoreSerializers
             TxnId[] bounds = new TxnId[count];
             for (int i = 0 ; i < bounds.length ; ++i)
                 bounds[i] = CommandSerializers.txnId.deserialize(in);
-            short[] statuses = new short[count * 2];
+            int[] statuses = new int[count * 2];
             for (int i = 0 ; i < statuses.length ; ++i)
                 statuses[i] = in.readShort();
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
index 31d879ce7c..2d2592eee8 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommitSerializers.java
@@ -52,9 +52,9 @@ public class CommitSerializers
             kind.serialize(msg.kind, out);
             CommandSerializers.ballot.serialize(msg.ballot, out);
             ExecuteAtSerializer.serialize(msg.txnId, msg.executeAt, out);
-            CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn, 
out, version);
+            CommandSerializers.nullablePartialTxn.serialize(msg.partialTxn(), 
out, version);
             if (msg.kind.withDeps == Commit.WithDeps.HasDeps)
-                DepsSerializers.partialDeps.serialize(msg.partialDeps, out);
+                DepsSerializers.partialDeps.serialize(msg.partialDeps(), out);
             serializeNullable(msg.route, out, KeySerializers.fullRoute);
         }
 
@@ -78,10 +78,10 @@ public class CommitSerializers
             long size = kind.serializedSize(msg.kind)
                    + CommandSerializers.ballot.serializedSize(msg.ballot)
                    + ExecuteAtSerializer.serializedSize(msg.txnId, 
msg.executeAt)
-                   + 
CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn, version);
+                   + 
CommandSerializers.nullablePartialTxn.serializedSize(msg.partialTxn(), version);
 
             if (msg.kind.withDeps == Commit.WithDeps.HasDeps)
-                size += 
DepsSerializers.partialDeps.serializedSize(msg.partialDeps);
+                size += 
DepsSerializers.partialDeps.serializedSize(msg.partialDeps());
 
             size += serializedNullableSize(msg.route, 
KeySerializers.fullRoute);
             return size;
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index 3eb2fa4984..b3f81bec28 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -56,7 +56,7 @@ public class RecoverySerializers
     static final int HAS_EXECUTE_AT_EPOCH = 0x2;
     static final int IS_FAST_PATH_DECIDED = 0x4;
     static final int SIZE_OF_FLAGS = 
VIntCoding.computeUnsignedVIntSize(HAS_ROUTE | HAS_EXECUTE_AT_EPOCH | 
IS_FAST_PATH_DECIDED);
-    public static final IVersionedSerializer<BeginRecovery> request = new 
WithUnsyncedSerializer<BeginRecovery>()
+    public static final IVersionedSerializer<BeginRecovery> request = new 
WithUnsyncedSerializer<>()
     {
         @Override
         public void serializeBody(BeginRecovery recover, DataOutputPlus out, 
Version version) throws IOException
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index fe2cbe2613..37e5efdb8f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -20,14 +20,14 @@ package org.apache.cassandra.service.accord.serializers;
 
 import java.io.IOException;
 
-import accord.messages.TxnRequest;
+import accord.messages.RouteRequest;
 import accord.primitives.Route;
 import accord.primitives.TxnId;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
-public abstract class TxnRequestSerializer<T extends TxnRequest<?>> implements 
IVersionedSerializer<T>
+public abstract class TxnRequestSerializer<T extends RouteRequest<?>> 
implements IVersionedSerializer<T>
 {
     void serializeHeader(T msg, DataOutputPlus out, Version version) throws 
IOException
     {
@@ -72,7 +72,7 @@ public abstract class TxnRequestSerializer<T extends 
TxnRequest<?>> implements I
         return serializedHeaderSize(msg, version) + serializedBodySize(msg, 
version);
     }
 
-    public static abstract class WithUnsyncedSerializer<T extends 
TxnRequest.WithUnsynced<?>> extends TxnRequestSerializer<T>
+    public static abstract class WithUnsyncedSerializer<T extends 
RouteRequest.WithUnsynced<?>> extends TxnRequestSerializer<T>
     {
         @Override
         void serializeHeader(T msg, DataOutputPlus out, Version version) 
throws IOException
diff --git a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java 
b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
index 341eff845b..a40a42dded 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/BootstrapAndJoin.java
@@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import com.googlecode.concurrenttrees.common.Iterables;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -362,7 +363,7 @@ public class BootstrapAndJoin extends 
MultiStepOperation<Epoch>
 
         StorageService.instance.repairPaxosForTopologyChange("bootstrap");
         Future<StreamState> bootstrapStream = 
StorageService.instance.startBootstrap(metadata, beingReplaced, movements, 
strictMovements);
-        Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata);
+        Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
         Future<?> ready = FutureCombiner.allOf(bootstrapStream, accordReady);
 
         try
diff --git a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java 
b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
index 68fc2471ac..8a121fd2c2 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/DropAccordTable.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.Keyspaces;
@@ -168,7 +169,7 @@ public class DropAccordTable extends 
MultiStepOperation<Epoch>
             return error(new IllegalStateException(String.format("Table %s is 
in an invalid state to be dropped", table)));
 
         long startNanos = nanoTime();
-        AccordService.instance().epochReady(metadata.epoch).get();
+        AccordService.instance().epochReady(metadata.epoch, 
EpochReady::reads).get();
         long epochEndNanos = nanoTime();
 
         // As of this writing this logic is based off ExclusiveSyncPoints 
which is a bit heavy weight for what is needed, this could cause timeouts for 
clusters that have a lot of data.
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java 
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index b54b796749..25058e20f2 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -31,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.TypeSizes;
@@ -258,7 +259,7 @@ public class Move extends MultiStepOperation<Epoch>
 
                     StreamResultFuture streamResult = streamPlan.execute();
 
-                    Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata);
+                    Future<?> accordReady = 
AccordService.instance().epochReadyFor(metadata, EpochReady::reads);
                     Future<?> ready = FutureCombiner.allOf(streamResult, 
accordReady);
                     ready.get();
                     
StorageService.instance.repairPaxosForTopologyChange("move");
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 51a551a19a..a2d0429c3f 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -49,6 +49,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Futures;
 
+import accord.api.ConfigurationService.EpochReady;
 import org.agrona.collections.IntArrayList;
 import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -1698,7 +1699,7 @@ public class ClusterUtils
             i.runOnInstance(() -> {
                 try
                 {
-                    
AccordService.instance().epochReady(Epoch.create(epoch)).get();
+                    AccordService.instance().epochReady(Epoch.create(epoch), 
EpochReady::reads).get();
                 }
                 catch (InterruptedException | ExecutionException e)
                 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java 
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 7d4320aa01..6d91a59237 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -44,7 +44,7 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.messages.AbstractRequest;
+import accord.messages.NoWaitRequest;
 import net.openhft.chronicle.core.util.SerializablePredicate;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.cql3.CQLTester;
@@ -109,8 +109,8 @@ public class TestBaseImpl extends DistributedTestBase
 
         // This isn't perfect at excluding messages so make sure it excludes 
the ones you care about in your test
         public static final SerializablePredicate<Message<?>> 
EXCLUDE_SYNC_POINT_MESSAGES = message -> {
-            if (message.payload instanceof AbstractRequest)
-                return 
!((AbstractRequest<?>)message.payload).txnId.isSyncPoint();
+            if (message.payload instanceof NoWaitRequest<?,?>)
+                return 
!((NoWaitRequest<?,?>)message.payload).txnId.isSyncPoint();
             return true;
         };
 
@@ -251,7 +251,7 @@ public class TestBaseImpl extends DistributedTestBase
         return sb.toString();
     }
 
-    protected void bootstrapAndJoinNode(Cluster cluster)
+    protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster)
     {
         IInstanceConfig config = cluster.newInstanceConfig();
         config.set("auto_bootstrap", true);
@@ -261,6 +261,7 @@ public class TestBaseImpl extends DistributedTestBase
                      () -> newInstance.startup(cluster));
         newInstance.nodetoolResult("join").asserts().success();
         newInstance.nodetoolResult("cms", "describe").asserts().success(); // 
just make sure we're joined, remove later
+        return newInstance;
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
index d30c3d887a..49632ed153 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordBootstrapTest.java
@@ -24,13 +24,16 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import accord.local.PreLoadContext;
+import accord.api.ConfigurationService.EpochReady;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.topology.TopologyManager;
+import accord.utils.async.AsyncResult;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -41,14 +44,15 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import 
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableFunction;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordConfigurationService;
-import 
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot;
 import org.apache.cassandra.service.accord.AccordSafeCommandStore;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -66,6 +70,8 @@ import static 
com.google.common.collect.Iterables.getOnlyElement;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static org.apache.cassandra.service.accord.AccordService.getBlocking;
+import static 
org.apache.cassandra.service.accord.AccordConfigurationService.EpochSnapshot.ResultStatus.SUCCESS;
+import static 
org.apache.cassandra.service.accord.AccordConfigurationService.SyncStatus.COMPLETED;
 
 public class AccordBootstrapTest extends TestBaseImpl
 {
@@ -81,7 +87,7 @@ public class AccordBootstrapTest extends TestBaseImpl
         return new PartitionKey(tid, dk(key));
     }
 
-    protected void bootstrapAndJoinNode(Cluster cluster)
+    protected IInvokableInstance bootstrapAndJoinNode(Cluster cluster)
     {
         IInstanceConfig config = cluster.newInstanceConfig();
         config.set("auto_bootstrap", true);
@@ -94,6 +100,7 @@ public class AccordBootstrapTest extends TestBaseImpl
 //                     () -> withProperty("cassandra.join_ring", false, () -> 
newInstance.startup(cluster)));
 //        newInstance.nodetoolResult("join").asserts().success();
         newInstance.nodetoolResult("cms", "describe").asserts().success(); // 
just make sure we're joined, remove later
+        return newInstance;
     }
 
     private static AccordService service()
@@ -101,11 +108,11 @@ public class AccordBootstrapTest extends TestBaseImpl
         return (AccordService) AccordService.instance();
     }
 
-    private static void awaitEpoch(long epoch)
+    private static void awaitEpoch(long epoch, Function<EpochReady, 
AsyncResult<Void>> await)
     {
         try
         {
-            boolean completed = 
service().epochReady(Epoch.create(epoch)).await(60, TimeUnit.SECONDS);
+            boolean completed = service().epochReady(Epoch.create(epoch), 
await).await(60, TimeUnit.SECONDS);
             Assertions.assertThat(completed)
                       .describedAs("Epoch %s did not become ready within 
timeout on %s -> %s",
                                    epoch, 
FBUtilities.getBroadcastAddressAndPort(),
@@ -168,6 +175,14 @@ public class AccordBootstrapTest extends TestBaseImpl
 
     @Test
     public void bootstrapTest() throws Throwable
+    {
+        bootstrapTest(Function.identity(), cluster -> {
+            bootstrapAndJoinNode(cluster);
+            awaitMaxEpochReadyToRead(cluster);
+        });
+    }
+
+    public void bootstrapTest(Function<Cluster.Builder, Cluster.Builder> 
setup, Consumer<Cluster> bootstrapAndJoinNode) throws Throwable
     {
         int originalNodeCount = 2;
         int expandedNodeCount = originalNodeCount + 1;
@@ -188,49 +203,10 @@ public class AccordBootstrapTest extends TestBaseImpl
             cluster.schemaChange("CREATE KEYSPACE ks WITH 
REPLICATION={'class':'SimpleStrategy', 'replication_factor':2}");
             cluster.schemaChange("CREATE TABLE ks.tbl (k int, c int, v int, 
primary key(k, c)) WITH transactional_mode='full'");
 
-            long initialMax = maxEpoch(cluster);
-
+            awaitMaxEpochReadyToRead(cluster);
             for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(initialMax, 
ClusterMetadata.current().epoch.getEpoch());
-                    awaitEpoch(initialMax);
-                    AccordConfigurationService configService = 
service().configService();
-                    long minEpoch = configService.minEpoch();
-
-                    Assert.assertEquals(initialMax, configService.maxEpoch());
-
-                    for (long epoch = minEpoch; epoch < initialMax; epoch++)
-                    {
-                        awaitEpoch(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), 
configService.getEpochSnapshot(epoch));
-                    }
-
-                    awaitLocalSyncNotification(initialMax);
-                    Assert.assertEquals(EpochSnapshot.completed(initialMax), 
configService.getEpochSnapshot(initialMax));
-                });
-            }
-
-            for (IInvokableInstance node : cluster)
-            {
                 node.runOnInstance(StreamListener::register);
-            }
-
-            long schemaChangeMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(schemaChangeMax));
-                    awaitEpoch(schemaChangeMax);
-                    AccordConfigurationService configService = 
service().configService();
-
-                    for (long epoch = initialMax + 1; epoch <= 
schemaChangeMax; epoch++)
-                    {
-                        awaitLocalSyncNotification(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), 
configService.getEpochSnapshot(epoch));
-                    }
-                });
-            }
+            awaitMaxEpochReadyToRead(cluster);
 
             for (int key = 0; key < 100; key++)
             {
@@ -251,21 +227,7 @@ public class AccordBootstrapTest extends TestBaseImpl
                 });
             }
 
-            bootstrapAndJoinNode(cluster);
-            long bootstrapMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(bootstrapMax));
-                    Assert.assertEquals(bootstrapMax, 
ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) 
AccordService.instance();
-                    awaitEpoch(bootstrapMax);
-                    AccordConfigurationService configService = 
service.configService();
-
-                    awaitLocalSyncNotification(bootstrapMax);
-                    Assert.assertEquals(EpochSnapshot.completed(bootstrapMax), 
configService.getEpochSnapshot(bootstrapMax));
-                });
-            }
+            bootstrapAndJoinNode.accept(cluster);
 
             InetAddress node3Addr = 
cluster.get(3).broadcastAddress().getAddress();
             for (IInvokableInstance node : cluster.get(1, 2))
@@ -278,15 +240,11 @@ public class AccordBootstrapTest extends TestBaseImpl
                         Assert.assertTrue(session.getNumKeyspaceTransfers() > 
0);
                     });
 
-                    
getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
 safeStore -> {
-                        AccordSafeCommandStore ss = (AccordSafeCommandStore) 
safeStore;
-                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.bootstrapBeganAt().keySet()));
-                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.safeToReadAt().keySet()));
-//
-//                        Assert.assertTrue(commandStore.maxBootstrapEpoch() > 
0);
-//                        
Assert.assertTrue(commandStore.bootstrapBeganAt().isEmpty());
-//                        
Assert.assertTrue(commandStore.safeToRead().isEmpty());
-                    }));
+                    service().node().commandStores().forAllUnsafe(unsafeStore 
-> {
+                        AccordCommandStore ss = (AccordCommandStore) 
unsafeStore;
+                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.unsafeGetBootstrapBeganAt().keySet()));
+                        Assert.assertEquals(Timestamp.NONE, 
getOnlyElement(ss.unsafeGetSafeToRead().keySet()));
+                    });
                 });
             }
 
@@ -321,7 +279,7 @@ public class AccordBootstrapTest extends TestBaseImpl
                         Assert.assertEquals(key, row.getInt("c"));
                         Assert.assertEquals(key, row.getInt("v"));
 
-                        
getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
 safeStore -> {
+                        
getBlocking(service().node().commandStores().forEach("Test", 
RoutingKeys.of(partitionKey.toUnseekable()), Long.MIN_VALUE, Long.MAX_VALUE, 
safeStore -> {
                             if 
(safeStore.ranges().currentRanges().contains(partitionKey))
                             {
                                 AccordSafeCommandStore ss = 
(AccordSafeCommandStore) safeStore;
@@ -375,44 +333,7 @@ public class AccordBootstrapTest extends TestBaseImpl
                 tokens[i] = cluster.get(i+1).callOnInstance(() -> 
Long.valueOf(getOnlyElement(StorageService.instance.getTokens())));
             }
 
-            for (IInvokableInstance node : cluster)
-            {
-
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(initialMax, 
ClusterMetadata.current().epoch.getEpoch());
-                    awaitEpoch(initialMax);
-                    AccordConfigurationService configService = 
service().configService();
-                    long minEpoch = configService.minEpoch();
-
-                    Assert.assertEquals(initialMax, configService.maxEpoch());
-
-                    for (long epoch = minEpoch; epoch < initialMax; epoch++)
-                    {
-                        awaitEpoch(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), 
configService.getEpochSnapshot(epoch));
-                    }
-
-                    awaitLocalSyncNotification(initialMax);
-                    Assert.assertEquals(EpochSnapshot.completed(initialMax), 
configService.getEpochSnapshot(initialMax));
-                });
-            }
-
-            long schemaChangeMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    Assert.assertEquals(schemaChangeMax, 
ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) 
AccordService.instance();
-                    awaitEpoch(schemaChangeMax);
-                    AccordConfigurationService configService = 
service.configService();
-
-                    for (long epoch = initialMax + 1; epoch <= 
schemaChangeMax; epoch++)
-                    {
-                        awaitLocalSyncNotification(epoch);
-                        Assert.assertEquals(EpochSnapshot.completed(epoch), 
configService.getEpochSnapshot(epoch));
-                    }
-                });
-            }
+            awaitMaxEpochReadyToRead(cluster);
 
             for (int key = 0; key < 100; key++)
             {
@@ -431,20 +352,7 @@ public class AccordBootstrapTest extends TestBaseImpl
 
             cluster.get(1).runOnInstance(() -> 
StorageService.instance.move(Long.toString(token)));
 
-            long moveMax = maxEpoch(cluster);
-            for (IInvokableInstance node : cluster)
-            {
-                node.runOnInstance(() -> {
-                    
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(moveMax));
-                    Assert.assertEquals(moveMax, 
ClusterMetadata.current().epoch.getEpoch());
-                    AccordService service = (AccordService) 
AccordService.instance();
-                    awaitEpoch(moveMax);
-                    AccordConfigurationService configService = 
service.configService();
-
-                    awaitLocalSyncNotification(moveMax);
-                    Assert.assertEquals(EpochSnapshot.completed(moveMax), 
configService.getEpochSnapshot(moveMax));
-                });
-            }
+            long moveMax = awaitMaxEpochReadyToRead(cluster);
 
             for (IInvokableInstance node : cluster)
             {
@@ -464,9 +372,7 @@ public class AccordBootstrapTest extends TestBaseImpl
 
                             PartitionKey partitionKey = new 
PartitionKey(tableId, dk);
 
-                            
getBlocking(service().node().commandStores().forEach((PreLoadContext.Empty)()->"Test",
-                                                                               
           partitionKey.toUnseekable(), moveMax, moveMax,
-                                                                               
           safeStore -> {
+                            
getBlocking(service().node().commandStores().forEach("Test", 
RoutingKeys.of(partitionKey.toUnseekable()), moveMax, moveMax, safeStore -> {
                                 if 
(!safeStore.ranges().allAt(preMove).contains(partitionKey))
                                 {
                                     AccordSafeCommandStore ss = 
(AccordSafeCommandStore) safeStore;
@@ -493,4 +399,41 @@ public class AccordBootstrapTest extends TestBaseImpl
             }
         }
     }
+
+    private static long awaitMaxEpochReadyToRead(Cluster cluster)
+    {
+        return awaitMaxEpoch(cluster, EpochReady::reads, true);
+    }
+
+    private static long awaitMaxEpochMetadataReady(Cluster cluster)
+    {
+        return awaitMaxEpoch(cluster, EpochReady::metadata, false);
+    }
+
+    private static long awaitMaxEpoch(Cluster cluster, 
SerializableFunction<EpochReady, AsyncResult<Void>> await, boolean 
expectReadyToRead)
+    {
+        long maxEpoch = maxEpoch(cluster);
+        for (IInvokableInstance node : cluster)
+        {
+            node.acceptOnInstance(aw -> {
+                
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(maxEpoch));
+                Assert.assertEquals(maxEpoch, 
ClusterMetadata.current().epoch.getEpoch());
+                AccordService service = (AccordService) 
AccordService.instance();
+                awaitEpoch(maxEpoch, aw);
+                AccordConfigurationService configService = 
service.configService();
+
+                awaitLocalSyncNotification(maxEpoch);
+                for (long epoch = configService.minEpoch(); epoch <= maxEpoch; 
epoch++)
+                {
+                    Assert.assertEquals(COMPLETED, 
configService.getEpochSnapshot(maxEpoch).syncStatus);
+                    Assert.assertEquals(SUCCESS, 
configService.getEpochSnapshot(maxEpoch).acknowledged);
+                    Assert.assertEquals(SUCCESS, 
configService.getEpochSnapshot(maxEpoch).received);
+                    if (expectReadyToRead)
+                        Assert.assertEquals(SUCCESS, 
configService.getEpochSnapshot(maxEpoch).reads);
+                }
+            }, node.transfer(await));
+        }
+        return maxEpoch;
+    }
+
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index a5a38edadf..bbb8a7f4cb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import accord.local.Node;
-import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.StoreParticipants;
 import accord.local.cfk.CommandsForKey;
@@ -32,6 +31,7 @@ import accord.local.cfk.SafeCommandsForKey;
 import accord.local.durability.DurabilityService;
 import accord.primitives.Keys;
 import accord.primitives.Ranges;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -68,8 +68,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-import static accord.local.LoadKeys.SYNC;
-import static accord.local.LoadKeysFor.READ_WRITE;
 import static java.lang.String.format;
 import static 
org.apache.cassandra.distributed.test.accord.AccordTestBase.executeWithRetry;
 import static org.apache.cassandra.service.accord.AccordService.getBlocking;
@@ -158,7 +156,7 @@ public class AccordIncrementalRepairTest extends 
TestBaseImpl
             {
                 cluster.filters().reset();
                 for (IInvokableInstance instance : cluster)
-                    instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().start()));
+                    instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().start()));
             }
         }
     }
@@ -207,7 +205,7 @@ public class AccordIncrementalRepairTest extends 
TestBaseImpl
     {
         Node node = accordService().node();
         AtomicReference<TxnId> waitFor = new AtomicReference<>(null);
-        
getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(key, SYNC, 
READ_WRITE, "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+        getBlocking(node.commandStores().forEach("Test", RoutingKeys.of(key), 
Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
             AccordSafeCommandStore store = (AccordSafeCommandStore) safeStore;
             SafeCommandsForKey safeCfk = store.ifLoadedAndInitialised(key);
             if (safeCfk == null)
@@ -229,7 +227,7 @@ public class AccordIncrementalRepairTest extends 
TestBaseImpl
             long now = Clock.Global.currentTimeMillis();
             if (now - start > TimeUnit.MINUTES.toMillis(1))
                 throw new AssertionError("Timeout");
-            
AccordService.getBlocking(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId,
 "Test"), key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+            getBlocking(node.commandStores().forEach("Test", 
RoutingKeys.of(key), Long.MIN_VALUE, Long.MAX_VALUE, safeStore -> {
                 SafeCommand command = safeStore.get(txnId, 
StoreParticipants.empty(txnId));
                 Assert.assertNotNull(command.current());
                 if (command.current().status().hasBeen(Status.Applied))
@@ -291,7 +289,7 @@ public class AccordIncrementalRepairTest extends 
TestBaseImpl
         // heal partition and wait for node 1 to see node 3 again
         for (IInvokableInstance instance : cluster)
             instance.runOnInstance(() -> {
-                
AccordService.instance().node().commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().stop());
+                
AccordService.instance().node().commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().stop());
                 Assert.assertFalse(barrierRecordingService().executedBarriers);
             });
         cluster.filters().reset();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
index c2b8f1e486..a7354e17b7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIntegrationTest.java
@@ -125,6 +125,6 @@ public class AccordIntegrationTest extends AccordTestBase
     private void pauseSimpleProgressLog()
     {
         for (IInvokableInstance instance : SHARED_CLUSTER)
-            instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().stop()));
+            instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().stop()));
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index b4c53b6159..cb91e9a160 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -166,7 +166,7 @@ public abstract class AccordTestBase extends TestBaseImpl
     {
         SHARED_CLUSTER.filters().reset();
         for (IInvokableInstance instance : SHARED_CLUSTER)
-            instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forEachCommandStore(cs -> 
cs.unsafeProgressLog().start()));
+            instance.runOnInstance(() -> 
AccordService.instance().node().commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().start()));
 
         truncateSystemTables();
 
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
index 1e311bf8a5..b6aeb6231d 100644
--- a/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/topology/AccordBounceTest.java
@@ -150,9 +150,9 @@ public class AccordBounceTest extends FuzzTestBase
                 // Command Stores should not be lost on bounce
                 Map<Integer, Set<String>> before = 
cluster.get(1).callOnInstance(() -> {
                     Map<Integer, Set<String>> m = new HashMap<>();
-                    
AccordService.instance().node().commandStores().forEach((store, ranges) -> {
+                    
AccordService.instance().node().commandStores().forAllUnsafe((store) -> {
                         Set<String> set = new HashSet<>();
-                        for (Range range : ranges.all())
+                        for (Range range : 
store.unsafeGetRangesForEpoch().all())
                             set.add(range.toString());
                         m.put(store.id(), set);
                     });
@@ -169,9 +169,9 @@ public class AccordBounceTest extends FuzzTestBase
 
                     Map<Integer, Set<String>> after = 
cluster.get(1).callOnInstance(() -> {
                         Map<Integer, Set<String>> m = new HashMap<>();
-                        
AccordService.instance().node().commandStores().forEach((store, ranges) -> {
+                        
AccordService.instance().node().commandStores().forAllUnsafe(store -> {
                             Set<String> set = new HashSet<>();
-                            for (Range range : ranges.all())
+                            for (Range range : 
store.unsafeGetRangesForEpoch().all())
                                 set.add(range.toString());
                             m.put(store.id(), set);
                         });
diff --git 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
index 25b7b952c8..42db56fb94 100644
--- 
a/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
+++ 
b/test/distributed/org/apache/cassandra/service/accord/AccordJournalBurnTest.java
@@ -354,11 +354,11 @@ public class AccordJournalBurnTest extends BurnTestBase
                              }
 
                              @Override
-                             public void replay(CommandStores commandStores)
+                             public boolean replay(CommandStores commandStores)
                              {
                                  // Make sure to replay _only_ static segments
                                  
this.closeCurrentSegmentForTestingIfNonEmpty();
-                                 super.replay(commandStores);
+                                 return super.replay(commandStores);
                              }
 
                              @Override
@@ -388,7 +388,7 @@ public class AccordJournalBurnTest extends BurnTestBase
     public static IAccordService.AccordCompactionInfos getCompactionInfo(Node 
node, TableId tableId)
     {
         IAccordService.AccordCompactionInfos compactionInfos = new 
IAccordService.AccordCompactionInfos(node.durableBefore(), 
node.topology().minEpoch());
-        node.commandStores().forEachCommandStore(commandStore -> {
+        node.commandStores().forAllUnsafe(commandStore -> {
             RedundantBefore redundantBefore = 
commandStore.unsafeGetRedundantBefore();
             if (redundantBefore == null)
                 redundantBefore = RedundantBefore.EMPTY;
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 7f7e44fd9f..024e343ed7 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -34,8 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.ProtocolModifiers;
-import accord.local.PreLoadContext;
-import accord.messages.TxnRequest;
+import accord.messages.NoWaitRequest;
 import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.SaveStatus;
@@ -212,7 +211,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
         TxnId syncId2 = new TxnId(101, 300, Txn.Kind.ExclusiveSyncPoint, 
Routable.Domain.Range, accord.nodeId());
         Ranges ranges1 = Ranges.of(TokenRange.create(new TokenKey(tableId, new 
LongToken(1)), new TokenKey(tableId, new LongToken(100))));
         Ranges ranges2 = Ranges.of(TokenRange.create(new TokenKey(tableId, new 
LongToken(100)), new TokenKey(tableId, new LongToken(200))));
-        
getBlocking(accord.node().commandStores().forEach((PreLoadContext.Empty)() -> 
"Test", safeStore -> {
+        getBlocking(accord.node().commandStores().forAll("Test", safeStore -> {
             safeStore.commandStore().markShardDurable(safeStore, syncId1, 
ranges1, HasOutcome.Universal);
             safeStore.commandStore().markShardDurable(safeStore, syncId2, 
ranges2, HasOutcome.Quorum);
         }));
@@ -436,9 +435,9 @@ public class AccordDebugKeyspaceTest extends CQLTester
             if (!msg.verb().name().startsWith("ACCORD_"))
                 return true;
             TxnId txnId = null;
-            if (msg.payload instanceof TxnRequest)
+            if (msg.payload instanceof NoWaitRequest<?,?>)
             {
-                txnId = ((TxnRequest<?>) msg.payload).txnId;
+                txnId = ((NoWaitRequest<?,?>) msg.payload).txnId;
                 if (applyTo != null && !applyTo.contains(txnId))
                     return true;
             }
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index d0c2d87f1e..b467174c42 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -30,6 +30,8 @@ import java.util.TreeSet;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+
+import accord.api.ConfigurationService.EpochReady;
 import accord.api.Journal;
 import accord.api.RoutingKey;
 import accord.local.CommandStores;
@@ -238,7 +240,7 @@ public class RouteIndexTest extends CQLTester
             this.storeId = storeId;
             this.txnId = txnId;
             this.saveStatus = saveStatus;
-            this.participants = StoreParticipants.all(route);
+            this.participants = StoreParticipants.all(route, saveStatus);
         }
 
         @Override
@@ -503,7 +505,7 @@ public class RouteIndexTest extends CQLTester
                 storeRangesForEpochs.put(i, new RangesForEpoch(1, 
Ranges.of(TokenRange.fullRange(tableId, getPartitioner()))));
 
             accordService = startAccord();
-            
accordService.epochReady(ClusterMetadata.current().epoch).awaitUninterruptibly();
+            accordService.epochReady(ClusterMetadata.current().epoch, 
EpochReady::reads).awaitUninterruptibly();
 
             minDecidedIdNull = rs.nextFloat();
             txnWriteFrequency = rs.pickInt(1, // every txn is a Write
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index b714b833f1..ac322403d1 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -176,7 +176,7 @@ public class AccordCommandTest
             Command before = safeStore.ifInitialised(txnId).current();
             Assert.assertEquals(commit.executeAt, before.executeAt());
             Assert.assertTrue(before.hasBeen(Status.Committed));
-            Assert.assertEquals(commit.partialDeps, before.partialDeps());
+            Assert.assertEquals(commit.partialDeps(), before.partialDeps());
 
             CommandsForKey cfk = 
safeStore.get(key(1).toUnseekable()).current();
             Assert.assertTrue(cfk.indexOf(txnId) >= 0);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
index adaff55578..081aaeec47 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
@@ -41,7 +41,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import accord.api.Agent;
-import accord.impl.AbstractConfigurationService;
+import accord.impl.AbstractTestConfigurationService;
 import accord.impl.TestAgent;
 import accord.impl.basic.Pending;
 import accord.impl.basic.PendingQueue;
@@ -410,7 +410,7 @@ public class AccordSyncPropagatorTest
             }
         }
 
-        private class ConfigService extends 
AbstractConfigurationService.Minimal implements AccordSyncPropagator.Listener
+        private class ConfigService extends AbstractTestConfigurationService 
implements AccordSyncPropagator.Listener
         {
             private final Map<Long, Set<Node.Id>> syncCompletes = new 
HashMap<>();
             private final Map<Long, Set<Node.Id>> endpointAcks = new 
HashMap<>();
@@ -436,7 +436,7 @@ public class AccordSyncPropagatorTest
             }
 
             @Override
-            protected void localSyncComplete(Topology topology, boolean 
startSync)
+            protected void onReadyToCoordinate(Topology topology, boolean 
startSync)
             {
                 Set<Node.Id> notify = topology.nodes().stream().filter(i -> 
!localId.equals(i)).collect(Collectors.toSet());
                 
instances.get(localId).propagator.reportSyncComplete(topology.epoch(), notify, 
localId);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
index b9602dec07..d73763ba5a 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStore.java
@@ -59,7 +59,7 @@ import accord.local.durability.DurabilityService;
 import accord.messages.BeginRecovery;
 import accord.messages.PreAccept;
 import accord.messages.Reply;
-import accord.messages.TxnRequest;
+import accord.messages.RouteRequest;
 import accord.primitives.AbstractUnseekableKeys;
 import accord.primitives.Ballot;
 import accord.primitives.EpochSupplier;
@@ -421,9 +421,9 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         throw error;
     }
 
-    public <T extends Reply> T process(TxnRequest<T> request) throws 
ExecutionException, InterruptedException
+    public <T extends Reply> T process(RouteRequest<T> request) throws 
ExecutionException, InterruptedException
     {
-        return process(request, request::apply);
+        return process(request, request);
     }
 
     public <T extends Reply> T process(PreLoadContext loadCtx, Function<? 
super SafeCommandStore, T> function) throws ExecutionException, 
InterruptedException
@@ -433,9 +433,9 @@ public class SimulatedAccordCommandStore implements 
AutoCloseable
         return getBlocking(result);
     }
 
-    public <T extends Reply> AsyncResult<T> processAsync(TxnRequest<T> request)
+    public <T extends Reply> AsyncResult<T> processAsync(RouteRequest<T> 
request)
     {
-        return processAsync(request, request::apply);
+        return processAsync(request, request);
     }
 
     public <T extends Reply> AsyncResult<T> processAsync(PreLoadContext 
loadCtx, Function<? super SafeCommandStore, T> function)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
index d54058b727..f18065adc9 100644
--- a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordTaskTest.java
@@ -120,9 +120,9 @@ public class SimulatedAccordTaskTest extends 
SimulatedAccordCommandStoreTestBase
                         FullRoute<?> route = txnWithRoute.right;
                         PreAccept preAccept = new PreAccept(nodeId, 
instance.topologies, txnId, txn, null, false, route) {
                             @Override
-                            public PreAcceptReply apply(SafeCommandStore 
safeStore)
+                            public PreAcceptReply 
applyInternal(SafeCommandStore safeStore)
                             {
-                                PreAcceptReply result = super.apply(safeStore);
+                                PreAcceptReply result = 
super.applyInternal(safeStore);
                                 if (action == Action.FAILURE)
                                     throw new SimulatedFault("PreAccept failed 
for keys " + keys());
                                 return result;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 884638da98..edca7e1c6b 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
@@ -49,6 +50,7 @@ import accord.api.AsyncExecutor;
 import accord.api.DataStore;
 import accord.api.Journal;
 import accord.api.Key;
+import accord.api.OwnershipEventListener;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.api.Timeouts;
@@ -657,8 +659,8 @@ public class CommandsForKeySerializerTest
         @Override public Agent agent() { return this; }
         @Override public void execute(Runnable run) {}
         @Override public void shutdown() { }
-        @Override public void onFailedBootstrap(int attempts, String phase, 
Ranges ranges, Runnable retry, Throwable failure) { throw new 
UnsupportedOperationException(); }
-        @Override public void onStale(Timestamp staleSince, Ranges ranges) { 
throw new UnsupportedOperationException(); }
+        @Override public <T> AsyncChain<T> chain(Callable<T> call) { throw new 
UnsupportedOperationException(); }
+        @Override public OwnershipEventListener ownershipEvents() { return 
null; }
         @Override public void onUncaughtException(Throwable t) { throw new 
UnsupportedOperationException(); }
         @Override public void onCaughtException(Throwable t, String context) { 
throw new UnsupportedOperationException(); }
         @Override public boolean rejectPreAccept(TimeService time, TxnId 
txnId) { throw new UnsupportedOperationException(); }
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index bd994fc27a..2c8f292b68 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -90,7 +90,7 @@ import org.quicktheories.impl.JavaRandom;
 
 import static accord.local.CommandStores.RangesForEpoch;
 import static accord.local.RedundantStatus.Property.GC_BEFORE;
-import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.Property.UNREADY;
 import static accord.local.RedundantStatus.SomeStatus.LOCALLY_APPLIED_ONLY;
 import static accord.local.RedundantStatus.SomeStatus.LOCALLY_WITNESSED_ONLY;
 import static accord.local.RedundantStatus.SomeStatus.SHARD_APPLIED_ONLY;
@@ -276,7 +276,7 @@ public class AccordGenerators
             if 
(saveStatus.known.deps().hasPreAcceptedOrProposedOrDecidedDeps())
                 builder.partialDeps(partialDeps);
 
-            builder.setParticipants(StoreParticipants.all(route));
+            builder.setParticipants(StoreParticipants.all(route, saveStatus));
             builder.durability(NotDurable);
             if (saveStatus.compareTo(SaveStatus.PreAccepted) >= 0)
                 builder.executeAt(executeAt);
@@ -601,9 +601,9 @@ public class AccordGenerators
             if (rs.nextBoolean())
                 bounds.add(Bounds.create(range, 
txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null ));
             if (rs.nextBoolean())
-                bounds.add(Bounds.create(range, txnIdGen.next(rs), 
oneSlow(PRE_BOOTSTRAP), null ));
+                bounds.add(Bounds.create(range, txnIdGen.next(rs), 
oneSlow(UNREADY), null ));
             if (rs.nextBoolean())
-                bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, 
new TxnId[0], new short[0], txnIdGen.next(rs)));
+                bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE, 
new TxnId[0], new int[0], txnIdGen.next(rs)));
 
             Collections.shuffle(bounds);
             long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : 
rs.nextLong(0, Long.MAX_VALUE);
@@ -618,7 +618,7 @@ public class AccordGenerators
             }
 
             long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch));
-            Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new 
TxnId[0], new short[0], null);
+            Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new 
TxnId[0], new int[0], null);
             if (result == null)
                 return epochBounds;
             return Bounds.reduce(result, epochBounds);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to