This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new d4c308a8a6 perf improvements
d4c308a8a6 is described below

commit d4c308a8a68ff892e4bbebf673b3d924a5313e52
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Feb 28 15:50:04 2024 +0000

    perf improvements
---
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |   4 +-
 src/java/org/apache/cassandra/config/Config.java   |   1 +
 .../cassandra/config/DatabaseDescriptor.java       |  22 +++
 .../db/compaction/CompactionIterator.java          |   6 +-
 src/java/org/apache/cassandra/net/Verb.java        |  32 +--
 .../service/accord/AccordCachingState.java         |   4 +-
 .../service/accord/AccordCommandStore.java         |  20 +-
 .../service/accord/AccordCommandStores.java        |   8 +-
 .../service/accord/AccordConfiguration.java        |   1 +
 .../cassandra/service/accord/AccordJournal.java    |  15 ++
 .../cassandra/service/accord/AccordKeyspace.java   |  19 +-
 .../service/accord/AccordObjectSizes.java          |  29 +--
 .../service/accord/AccordSafeCommandStore.java     |  50 +++--
 .../service/accord/AccordSafeCommandsForKey.java   |  13 +-
 .../cassandra/service/accord/AccordSafeState.java  |   2 +
 .../service/accord/AccordSafeTimestampsForKey.java |   9 +-
 .../cassandra/service/accord/AccordService.java    |   8 +-
 .../service/accord/CommandsForRanges.java          |  11 +-
 .../cassandra/service/accord/api/AccordAgent.java  |   3 +
 .../service/accord/async/AsyncLoader.java          |  12 +-
 .../service/accord/async/AsyncOperation.java       |   5 +-
 .../service/accord/interop/AccordInteropApply.java |   2 -
 .../serializers/CommandsForKeySerializer.java      | 217 +++++++++++----------
 .../accord/serializers/WaitingOnSerializer.java    | 103 +++++++---
 .../cassandra/service/accord/txn/TxnWrite.java     |   5 +-
 .../apache/cassandra/utils/vint/VIntCoding.java    |  47 ++++-
 test/conf/logback-dtest-quiet.xml                  |  56 ++++++
 .../cassandra/distributed/api/ICoordinator.java    |   3 +
 .../cassandra/distributed/impl/Coordinator.java    |  25 +++
 .../cassandra/distributed/impl/Instance.java       |   2 +-
 .../distributed/test/accord/AccordLoadTest.java    |  96 +++++++++
 .../compaction/CompactionAccordIteratorsTest.java  |   2 +-
 .../service/accord/AccordCommandStoreTest.java     |  29 +--
 .../service/accord/AccordCommandTest.java          |   8 +-
 .../cassandra/service/accord/AccordTestUtils.java  |  12 +-
 .../service/accord/async/AsyncLoaderTest.java      |   9 +-
 .../service/accord/async/AsyncOperationTest.java   |  11 +-
 .../serializers/CommandsForKeySerializerTest.java  |  39 ++--
 .../serializers/WaitingOnSerializerTest.java       |  33 ++--
 40 files changed, 654 insertions(+), 321 deletions(-)

diff --git a/modules/accord b/modules/accord
index ef36616441..3562bb3c9c 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit ef36616441bd4ff4fec5379d986c75ad5a62ff7d
+Subproject commit 3562bb3c9ce4e9eecdf65e236e968ef3ee9e0a86
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 697d7edc1e..e76745a233 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -47,9 +47,9 @@ public class AccordSpec
 
     public volatile DurationSpec fast_path_update_delay = new 
DurationSpec.IntSecondsBound(5);
 
-    public volatile DurationSpec schedule_durability_frequency = new 
DurationSpec.IntSecondsBound(15);
+    public volatile DurationSpec schedule_durability_frequency = new 
DurationSpec.IntSecondsBound(5);
     public volatile DurationSpec durability_txnid_lag = new 
DurationSpec.IntSecondsBound(5);
-    public volatile DurationSpec shard_durability_cycle = new 
DurationSpec.IntMinutesBound(2);
+    public volatile DurationSpec shard_durability_cycle = new 
DurationSpec.IntMinutesBound(1);
     public volatile DurationSpec global_durability_cycle = new 
DurationSpec.IntMinutesBound(10);
 
     public enum TransactionalRangeMigration
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 338ae13981..8a7a9d7d35 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -480,6 +480,7 @@ public class Config
     public volatile int counter_cache_keys_to_save = Integer.MAX_VALUE;
 
     public DataStorageSpec.LongMebibytesBound paxos_cache_size = null;
+    public DataStorageSpec.LongMebibytesBound accord_cache_size = null;
 
     public DataStorageSpec.LongMebibytesBound consensus_migration_cache_size = 
null;
 
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3922a94c93..7273f3c5c7 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -210,6 +210,7 @@ public class DatabaseDescriptor
 
     private static long keyCacheSizeInMiB;
     private static long paxosCacheSizeInMiB;
+    private static long accordCacheSizeInMiB;
     private static long consensusMigrationCacheSizeInMiB;
     private static long counterCacheSizeInMiB;
     private static long indexSummaryCapacityInMiB;
@@ -894,6 +895,22 @@ public class DatabaseDescriptor
                                              + conf.paxos_cache_size + "', 
supported values are <integer> >= 0.", false);
         }
 
+        try
+        {
+            // if paxosCacheSizeInMiB option was set to "auto" then size of 
the cache should be "max(10% of Heap (in MB), 1MB)
+            accordCacheSizeInMiB = (conf.accord_cache_size == null)
+                                  ? Math.max(1, (int) 
((Runtime.getRuntime().totalMemory() * 0.10) / 1024 / 1024))
+                                  : conf.accord_cache_size.toMebibytes();
+
+            if (accordCacheSizeInMiB < 0)
+                throw new NumberFormatException(); // to escape duplicating 
error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("paxos_cache_size option was set 
incorrectly to '"
+                                             + conf.paxos_cache_size + "', 
supported values are <integer> >= 0.", false);
+        }
+
         try
         {
             // if consensusMigrationCacheSizeInMiB option was set to "auto" 
then size of the cache should be "min(1% of Heap (in MB), 50MB)
@@ -3863,6 +3880,11 @@ public class DatabaseDescriptor
         return paxosCacheSizeInMiB;
     }
 
+    public static long getAccordCacheSizeInMiB()
+    {
+        return accordCacheSizeInMiB;
+    }
+
     public static long getConsensusMigrationCacheSizeInMiB()
     {
         return consensusMigrationCacheSizeInMiB;
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index bb404bbe5a..e0dcb5682f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -978,11 +978,7 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
             if (redundantBeforeEntry == null)
                 return row;
 
-            TxnId redundantBeforeTxnId = 
redundantBeforeEntry.shardRedundantBefore();
-            if (redundantBeforeTxnId.equals(TxnId.NONE))
-                return row;
-
-            return 
CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row, 
redundantBeforeTxnId);
+            return 
CommandsForKeysAccessor.withoutRedundantCommands(partitionKey, row, 
redundantBeforeEntry);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 457bd14c80..4fed5c618f 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -266,7 +266,7 @@ public enum Verb
     PAXOS2_PREPARE_REQ               (40, P2, writeTimeout,  MUTATION,         
 () -> PaxosPrepare.requestSerializer,          () -> 
PaxosPrepare.requestHandler,                           PAXOS2_PREPARE_RSP       
        ),
     PAXOS2_PREPARE_REFRESH_RSP       (51, P2, writeTimeout,  REQUEST_RESPONSE, 
 () -> PaxosPrepareRefresh.responseSerializer,  RESPONSE_HANDLER                
                                            ),
     PAXOS2_PREPARE_REFRESH_REQ       (41, P2, writeTimeout,  MUTATION,         
 () -> PaxosPrepareRefresh.requestSerializer,   () -> 
PaxosPrepareRefresh.requestHandler,                    
PAXOS2_PREPARE_REFRESH_RSP       ),
-    PAXOS2_PROPOSE_RSP               (52, P2, writeTimeout, REQUEST_RESPONSE, 
() -> PaxosPropose.ACCEPT_RESULT_SERIALIZER, RESPONSE_HANDLER                   
                                         ),
+    PAXOS2_PROPOSE_RSP               (52, P2, writeTimeout,  REQUEST_RESPONSE, 
 () -> PaxosPropose.ACCEPT_RESULT_SERIALIZER,   RESPONSE_HANDLER                
                                            ),
     PAXOS2_PROPOSE_REQ               (42, P2, writeTimeout,  MUTATION,         
 () -> PaxosPropose.requestSerializer,          () -> 
PaxosPropose.requestHandler,                           PAXOS2_PROPOSE_RSP       
        ),
     PAXOS2_COMMIT_AND_PREPARE_RSP    (53, P2, writeTimeout,  REQUEST_RESPONSE, 
 () -> PaxosPrepare.responseSerializer,         RESPONSE_HANDLER                
                                            ),
     PAXOS2_COMMIT_AND_PREPARE_REQ    (43, P2, writeTimeout,  MUTATION,         
 () -> PaxosCommitAndPrepare.requestSerializer, () -> 
PaxosCommitAndPrepare.requestHandler,                  
PAXOS2_COMMIT_AND_PREPARE_RSP    ),
@@ -305,41 +305,41 @@ public enum Verb
     DATA_MOVEMENT_EXECUTED_REQ  (817, P1, rpcTimeout, MISC, () -> 
DataMovement.Status.serializer,   () -> DataMovements.instance,           
DATA_MOVEMENT_EXECUTED_RSP  ),
 
     // accord
-    ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER                   
                                         ),
-    ACCORD_PRE_ACCEPT_RSP           (120, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
+    ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, IMMEDIATE,         
 () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER                   
                                         ),
+    ACCORD_PRE_ACCEPT_RSP           (120, P2, writeTimeout, IMMEDIATE,         
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
     ACCORD_PRE_ACCEPT_REQ           (121, P2, writeTimeout, IMMEDIATE,         
 () -> PreacceptSerializers.request,         AccordService::verbHandlerOrNoop, 
ACCORD_PRE_ACCEPT_RSP                     ),
-    ACCORD_ACCEPT_RSP               (122, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> AcceptSerializers.reply,              RESPONSE_HANDLER                   
                                         ),
+    ACCORD_ACCEPT_RSP               (122, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.reply,              RESPONSE_HANDLER                   
                                         ),
     ACCORD_ACCEPT_REQ               (123, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.request,            AccordService::verbHandlerOrNoop, 
ACCORD_ACCEPT_RSP                         ),
     ACCORD_ACCEPT_INVALIDATE_REQ    (124, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.invalidate,         AccordService::verbHandlerOrNoop, 
ACCORD_ACCEPT_RSP                         ),
-    ACCORD_READ_RSP                 (125, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ReadDataSerializers.reply,            RESPONSE_HANDLER                   
                                         ),
+    ACCORD_READ_RSP                 (125, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.reply,            RESPONSE_HANDLER                   
                                         ),
     ACCORD_READ_REQ                 (126, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.readData,         AccordService::verbHandlerOrNoop, 
ACCORD_READ_RSP                           ),
     ACCORD_COMMIT_REQ               (127, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.request,            AccordService::verbHandlerOrNoop, 
ACCORD_READ_RSP                           ),
     ACCORD_COMMIT_INVALIDATE_REQ    (128, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.invalidate,         AccordService::verbHandlerOrNoop   
                                         ),
-    ACCORD_APPLY_RSP                (129, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ApplySerializers.reply,               RESPONSE_HANDLER                   
                                         ),
-    ACCORD_APPLY_REQ                (130, P2, writeTimeout, IMMEDIATE, () -> 
ApplySerializers.request, AccordService::verbHandlerOrNoop, ACCORD_APPLY_RSP    
                      ),
-    ACCORD_BEGIN_RECOVER_RSP        (131, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
+    ACCORD_APPLY_RSP                (129, P2, writeTimeout, IMMEDIATE,         
 () -> ApplySerializers.reply,               RESPONSE_HANDLER                   
                                         ),
+    ACCORD_APPLY_REQ                (130, P2, writeTimeout, IMMEDIATE,         
 () -> ApplySerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_APPLY_RSP                          ),
+    ACCORD_BEGIN_RECOVER_RSP        (131, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
     ACCORD_BEGIN_RECOVER_REQ        (132, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.request,          AccordService::verbHandlerOrNoop, 
ACCORD_BEGIN_RECOVER_RSP                  ),
-    ACCORD_BEGIN_INVALIDATE_RSP     (133, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> BeginInvalidationSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
+    ACCORD_BEGIN_INVALIDATE_RSP     (133, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
     ACCORD_BEGIN_INVALIDATE_REQ     (134, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_BEGIN_INVALIDATE_RSP               ),
-    ACCORD_WAIT_ON_COMMIT_RSP       (136, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
+    ACCORD_WAIT_ON_COMMIT_RSP       (136, P2, writeTimeout, IMMEDIATE,         
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
     ACCORD_WAIT_ON_COMMIT_REQ       (135, P2, writeTimeout, IMMEDIATE,         
 () -> WaitOnCommitSerializer.request,       AccordService::verbHandlerOrNoop, 
ACCORD_WAIT_ON_COMMIT_RSP                 ),
     ACCORD_WAIT_UNTIL_APPLIED_REQ   (137, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.waitUntilApplied, AccordService::verbHandlerOrNoop, 
ACCORD_READ_RSP                           ),
     ACCORD_INFORM_OF_TXN_REQ        (138, P2, writeTimeout, IMMEDIATE,         
 () -> InformOfTxnIdSerializers.request,     AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_INFORM_HOME_DURABLE_REQ  (139, P2, writeTimeout, IMMEDIATE,         
 () -> InformHomeDurableSerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_INFORM_DURABLE_REQ       (140, P2, writeTimeout, IMMEDIATE,         
 () -> InformDurableSerializers.request,     AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
-    ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER                   
                                         ),
+    ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, IMMEDIATE,         
 () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER                   
                                         ),
     ACCORD_CHECK_STATUS_REQ         (142, P2, writeTimeout, IMMEDIATE,         
 () -> CheckStatusSerializers.request,       AccordService::verbHandlerOrNoop, 
ACCORD_CHECK_STATUS_RSP                   ),
-    ACCORD_GET_DEPS_RSP             (143, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> GetDepsSerializers.reply,             RESPONSE_HANDLER                   
                                         ),
+    ACCORD_GET_DEPS_RSP             (143, P2, writeTimeout, IMMEDIATE,         
 () -> GetDepsSerializers.reply,             RESPONSE_HANDLER                   
                                         ),
     ACCORD_GET_DEPS_REQ             (144, P2, writeTimeout, IMMEDIATE,         
 () -> GetDepsSerializers.request,           AccordService::verbHandlerOrNoop, 
ACCORD_GET_DEPS_RSP                       ),
-    ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> GetEphmrlReadDepsSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
+    ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE,         
 () -> GetEphmrlReadDepsSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
     ACCORD_GET_EPHMRL_READ_DEPS_REQ (162, P2, writeTimeout, IMMEDIATE,         
 () -> GetEphmrlReadDepsSerializers.request, AccordService::verbHandlerOrNoop, 
ACCORD_GET_EPHMRL_READ_DEPS_RSP),
-    ACCORD_GET_MAX_CONFLICT_RSP     (163, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> GetMaxConflictSerializers.reply,      RESPONSE_HANDLER                   
                                         ),
+    ACCORD_GET_MAX_CONFLICT_RSP     (163, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.reply,      RESPONSE_HANDLER                   
                                         ),
     ACCORD_GET_MAX_CONFLICT_REQ     (164, P2, writeTimeout, IMMEDIATE,         
 () -> GetMaxConflictSerializers.request,    AccordService::verbHandlerOrNoop, 
ACCORD_GET_MAX_CONFLICT_RSP),
-    ACCORD_FETCH_DATA_RSP           (145, P2, repairTimeout,REQUEST_RESPONSE,  
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
+    ACCORD_FETCH_DATA_RSP           (145, P2, repairTimeout,IMMEDIATE,         
 () -> FetchSerializers.reply,               RESPONSE_HANDLER                   
                                         ),
     ACCORD_FETCH_DATA_REQ           (146, P2, repairTimeout,IMMEDIATE,         
 () -> FetchSerializers.request,             AccordService::verbHandlerOrNoop, 
ACCORD_FETCH_DATA_RSP                     ),
     ACCORD_SET_SHARD_DURABLE_REQ    (147, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.shardDurable,   AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
     ACCORD_SET_GLOBALLY_DURABLE_REQ (148, P2, writeTimeout, IMMEDIATE,         
 () -> SetDurableSerializers.globallyDurable,AccordService::verbHandlerOrNoop, 
ACCORD_SIMPLE_RSP                         ),
-    ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> QueryDurableBeforeSerializers.reply,  RESPONSE_HANDLER                   
                                         ),
+    ACCORD_QUERY_DURABLE_BEFORE_RSP (149, P2, writeTimeout, IMMEDIATE,         
 () -> QueryDurableBeforeSerializers.reply,  RESPONSE_HANDLER                   
                                         ),
     ACCORD_QUERY_DURABLE_BEFORE_REQ (150, P2, writeTimeout, IMMEDIATE,         
 () -> QueryDurableBeforeSerializers.request,AccordService::verbHandlerOrNoop, 
ACCORD_QUERY_DURABLE_BEFORE_RSP           ),
 
     ACCORD_SYNC_NOTIFY_REQ          (151, P2, writeTimeout, IMMEDIATE,         
 () -> Notification.listSerializer,          () -> 
AccordSyncPropagator.verbHandler,       ACCORD_SIMPLE_RSP             ),
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java 
b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
index 5175e86c10..d07dcfc87b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCachingState.java
@@ -123,7 +123,7 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
 
     int estimatedSizeOnHeap(ToLongFunction<V> estimator)
     {
-        shouldUpdateSize = false;
+        shouldUpdateSize = false;   // TODO (expected): probably not the 
safest place to clear need to compute size
         return lastQueriedEstimatedSizeOnHeap = Ints.checkedCast(EMPTY_SIZE + 
estimateStateOnHeapSize(estimator));
     }
 
@@ -204,7 +204,7 @@ public class AccordCachingState<K, V> extends 
IntrusiveLinkedListNode
     protected State<K, V> state(State<K, V> next)
     {
         State<K, V> prev = state;
-        if (prev != next)
+        if (prev != next)   // TODO (expected): we change state to transition 
the cache state machine but often keep payload the same - so shouldn't recompute
             shouldUpdateSize = true;
         return state = next;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 36b224b022..4b0722606b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -42,7 +42,7 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.impl.CommandsSummary;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
@@ -114,8 +114,8 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
     private final ExecutionOrder executionOrder;
     private final AccordStateCache stateCache;
     private final AccordStateCache.Instance<TxnId, Command, AccordSafeCommand> 
commandCache;
-    private final AccordStateCache.Instance<RoutableKey, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsForKeyCache;
-    private final AccordStateCache.Instance<RoutableKey, CommandsForKey, 
AccordSafeCommandsForKey> commandsForKeyCache;
+    private final AccordStateCache.Instance<Key, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsForKeyCache;
+    private final AccordStateCache.Instance<Key, CommandsForKey, 
AccordSafeCommandsForKey> commandsForKeyCache;
     private AsyncOperation<?> currentOperation = null;
     private AccordSafeCommandStore current = null;
     private long lastSystemTimestampMicros = Long.MIN_VALUE;
@@ -161,7 +161,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
                                 this::validateCommand,
                                 AccordObjectSizes::command);
         timestampsForKeyCache =
-            stateCache.instance(RoutableKey.class,
+            stateCache.instance(Key.class,
                                 AccordSafeTimestampsForKey.class,
                                 AccordSafeTimestampsForKey::new,
                                 this::loadTimestampsForKey,
@@ -169,7 +169,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
                                 this::validateTimestampsForKey,
                                 AccordObjectSizes::timestampsForKey);
         commandsForKeyCache =
-            stateCache.instance(RoutableKey.class,
+            stateCache.instance(Key.class,
                                 AccordSafeCommandsForKey.class,
                                 AccordSafeCommandsForKey::new,
                                 this::loadCommandsForKey,
@@ -220,7 +220,7 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
 
                 MessageProvider messageProvider = 
journal.makeMessageProvider(txnId);
 
-                SerializerSupport.TxnAndDeps txnAndDeps = 
SerializerSupport.extractTxnAndDeps(status, accepted, messageProvider);
+                SerializerSupport.TxnAndDeps txnAndDeps = 
SerializerSupport.extractTxnAndDeps(unsafeRangesForEpoch(), status, accepted, 
messageProvider);
                 Seekables<?, ?> keys = txnAndDeps.txn.keys();
                 if (keys.domain() != Routable.Domain.Range)
                     throw new AssertionError(String.format("Txn keys are not 
range for %s", txnAndDeps.txn));
@@ -311,12 +311,12 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
         return commandCache;
     }
 
-    public AccordStateCache.Instance<RoutableKey, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsForKeyCache()
+    public AccordStateCache.Instance<Key, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsForKeyCache()
     {
         return timestampsForKeyCache;
     }
 
-    public AccordStateCache.Instance<RoutableKey, CommandsForKey, 
AccordSafeCommandsForKey> commandsForKeyCache()
+    public AccordStateCache.Instance<Key, CommandsForKey, 
AccordSafeCommandsForKey> commandsForKeyCache()
     {
         return commandsForKeyCache;
     }
@@ -466,8 +466,8 @@ public class AccordCommandStore extends CommandStore 
implements CacheSize
 
     public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
                                                  Map<TxnId, AccordSafeCommand> 
commands,
-                                                 NavigableMap<RoutableKey, 
AccordSafeTimestampsForKey> timestampsForKeys,
-                                                 NavigableMap<RoutableKey, 
AccordSafeCommandsForKey> commandsForKeys)
+                                                 NavigableMap<Key, 
AccordSafeTimestampsForKey> timestampsForKeys,
+                                                 NavigableMap<Key, 
AccordSafeCommandsForKey> commandsForKeys)
     {
         Invariants.checkState(current == null);
         commands.values().forEach(AccordSafeState::preExecute);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index cf4cda992b..0fd719d5fb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -31,6 +31,7 @@ import accord.primitives.Range;
 import accord.topology.Topology;
 import accord.utils.RandomSource;
 import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.AccordStateCacheMetrics;
 import org.apache.cassandra.metrics.CacheSizeMetrics;
 import org.apache.cassandra.schema.TableId;
@@ -47,7 +48,7 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
                         ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory, AccordJournal journal)
     {
         super(time, agent, store, random, shardDistributor, 
progressLogFactory, AccordCommandStore.factory(journal, new 
AccordStateCacheMetrics(ACCORD_STATE_CACHE)));
-        setCapacity(maxCacheSize());
+        setCapacity(DatabaseDescriptor.getAccordCacheSizeInMiB() << 20);
         this.cacheSizeMetrics = new CacheSizeMetrics(ACCORD_STATE_CACHE, this);
     }
 
@@ -110,11 +111,6 @@ public class AccordCommandStores extends CommandStores 
implements CacheSize
         forEach(commandStore -> ((AccordSafeCommandStore) 
commandStore).commandStore().setCapacity(perStore));
     }
 
-    private static long maxCacheSize()
-    {
-        return 5 << 20; // TODO (required): make configurable
-    }
-
     @Override
     public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology, boolean startSync)
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
index a17a9fc844..d87e6c96cd 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordConfiguration.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import accord.config.LocalConfig;
 import org.apache.cassandra.config.Config;
 
+// TODO (expected): should this be merged with AccordSpec?
 public class AccordConfiguration implements LocalConfig
 {
     private final Config config;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index ae202d2f63..26e09ada14 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -570,6 +571,7 @@ public class AccordJournal implements Shutdownable
 
         Key(Timestamp timestamp, Type type)
         {
+            if (timestamp == null) throw new NullPointerException("Null 
timestamp for type " + type);
             this.timestamp = timestamp;
             this.type = type;
         }
@@ -1361,6 +1363,19 @@ public class AccordJournal implements Shutdownable
             return presentMessages;
         }
 
+        public Set<MessageType> all()
+        {
+            Set<Type> types = EnumSet.allOf(Type.class);
+            Set<Key> keys = new ObjectHashSet<>(types.size() + 1, 0.9f);
+            for (Type type : types)
+                keys.add(new Key(txnId, type));
+            Set<Key> presentKeys = journal.test(keys);
+            Set<MessageType> presentMessages = new 
ObjectHashSet<>(presentKeys.size() + 1, 0.9f);
+            for (Key key : presentKeys)
+                presentMessages.add(key.type.outgoingType);
+            return presentMessages;
+        }
+
         @Override
         public PreAccept preAccept()
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 2684be683f..535bc4948f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -45,7 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.Command.WaitingOn;
@@ -592,7 +592,7 @@ public class AccordKeyspace
         }
 
         // TODO (expected): garbage-free filtering, reusing encoding
-        public Row withoutRedundantCommands(PartitionKey key, Row row, TxnId 
redundantBefore)
+        public Row withoutRedundantCommands(PartitionKey key, Row row, 
RedundantBefore.Entry redundantBefore)
         {
             Invariants.checkState(row.columnCount() == 1);
             Cell<?> cell = row.getCell(data);
@@ -600,7 +600,10 @@ public class AccordKeyspace
                 return row;
 
             CommandsForKey current = CommandsForKeySerializer.fromBytes(key, 
cell.buffer());
-            CommandsForKey updated = current.withoutRedundant(redundantBefore);
+            if (current == null)
+                return null;
+
+            CommandsForKey updated = 
current.withRedundantBefore(redundantBefore);
             if (current == updated)
                 return row;
 
@@ -822,7 +825,7 @@ public class AccordKeyspace
                 Command.Committed committed = command.asCommitted();
                 Command.Committed originalCommitted = original != null && 
original.isCommitted() ? original.asCommitted() : null;
                 if (originalCommitted == null || committed.waitingOn != 
originalCommitted.waitingOn)
-                    builder.addCell(live(CommandsColumns.waiting_on, 
timestampMicros, WaitingOnSerializer.serialize(committed.waitingOn)));
+                    builder.addCell(live(CommandsColumns.waiting_on, 
timestampMicros, WaitingOnSerializer.serialize(committed.txnId(), 
committed.waitingOn)));
             }
 
             Row row = builder.build();
@@ -1189,10 +1192,10 @@ public class AccordKeyspace
             Ballot promised = deserializePromisedOrNull(row);
             Ballot accepted = deserializeAcceptedOrNull(row);
 
-            WaitingOnProvider waitingOn = deserializeWaitingOn(row);
+            WaitingOnProvider waitingOn = deserializeWaitingOn(txnId, row);
             MessageProvider messages = commandStore.makeMessageProvider(txnId);
 
-            return SerializerSupport.reconstruct(attrs, status, executeAt, 
promised, accepted, waitingOn, messages);
+            return 
SerializerSupport.reconstruct(commandStore.unsafeRangesForEpoch(), attrs, 
status, executeAt, promised, accepted, waitingOn, messages);
         }
         catch (Throwable t)
         {
@@ -1270,7 +1273,7 @@ public class AccordKeyspace
         return deserializeTimestampOrNull(row.getBlob("accepted_ballot"), 
Ballot::fromBits);
     }
 
-    private static WaitingOnProvider deserializeWaitingOn(UntypedResultSet.Row 
row)
+    private static WaitingOnProvider deserializeWaitingOn(TxnId txnId, 
UntypedResultSet.Row row)
     {
         ByteBuffer bytes = row.getBlob("waiting_on");
 
@@ -1284,7 +1287,7 @@ public class AccordKeyspace
 
             try
             {
-                return WaitingOnSerializer.deserialize(deps, bytes);
+                return WaitingOnSerializer.deserialize(txnId, 
deps.keyDeps.keys(), deps.rangeDeps.txnIds(), bytes);
             }
             catch (IOException e)
             {
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index a1704ba52b..7346a6eebf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -24,11 +24,12 @@ import java.util.function.ToLongFunction;
 import accord.api.Key;
 import accord.api.Result;
 import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.Info;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.TxnInfo;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.Command.WaitingOn;
+import accord.local.CommandsForKey.TxnInfoWithMissing;
 import accord.local.CommonAttributes;
 import accord.local.Node;
 import accord.local.SaveStatus;
@@ -305,7 +306,6 @@ public class AccordObjectSizes
                     return ACCEPTED;
                 case Committed:
                 case Stable:
-                case ReadyToExecute:
                     return COMMITTED;
                 case PreApplied:
                 case Applied:
@@ -363,22 +363,25 @@ public class AccordObjectSizes
     }
 
     private static long EMPTY_CFK_SIZE = measure(new CommandsForKey(null));
-    private static long EMPTY_INFO_SIZE = 
measure(CommandsForKey.Info.createMock(null, null, null));
+    private static long EMPTY_INFO_SIZE = 
measure(TxnInfo.createMock(TxnId.NONE, null, null, null));
+    private static long EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE = 
measure(TxnInfo.createMock(TxnId.NONE, null, null, null)) - EMPTY_INFO_SIZE;
     public static long commandsForKey(CommandsForKey cfk)
     {
         long size = EMPTY_CFK_SIZE;
         size += key(cfk.key());
-        size += 2 * ObjectSizes.sizeOfReferenceArray(cfk.size());
-        size += cfk.size() * TIMESTAMP_SIZE;
+        size += ObjectSizes.sizeOfReferenceArray(cfk.size());
+        size += cfk.size() * EMPTY_INFO_SIZE;
         for (int i = 0 ; i < cfk.size() ; ++i)
         {
-            Info info = cfk.info(i);
-            if (info.getClass() == CommandsForKey.NoInfo.class)
-                continue;
-
-            size += EMPTY_INFO_SIZE;
-            if (info.missing.length > 0)
-                size += ObjectSizes.sizeOfReferenceArray(info.missing.length);
+            TxnInfo info = cfk.get(i);
+            if (info.getClass() != TxnInfoWithMissing.class) continue;
+            TxnInfoWithMissing infoWithMissing = (TxnInfoWithMissing) info;
+            if (infoWithMissing.missing.length > 0)
+            {
+                size += EMPTY_INFO_WITH_MISSING_ADDITIONAL_SIZE;
+                size += 
ObjectSizes.sizeOfReferenceArray(infoWithMissing.missing.length);
+                size += infoWithMissing.missing.length * TIMESTAMP_SIZE;
+            }
         }
         return size;
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index c1b09ccb27..1c497cc5c9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -31,8 +31,7 @@ import accord.api.DataStore;
 import accord.api.Key;
 import accord.api.ProgressLog;
 import accord.impl.AbstractSafeCommandStore;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKeys;
+import accord.local.CommandsForKey;
 import accord.impl.CommandsSummary;
 import accord.local.Command;
 import accord.local.CommandStores.RangesForEpoch;
@@ -41,7 +40,6 @@ import accord.local.PreLoadContext;
 import accord.primitives.AbstractKeys;
 import accord.primitives.Deps;
 import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
 import accord.primitives.Routables;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
@@ -53,16 +51,16 @@ import static accord.primitives.Routable.Domain.Range;
 public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeTimestampsForKey, 
AccordSafeCommandsForKey>
 {
     private final Map<TxnId, AccordSafeCommand> commands;
-    private final NavigableMap<RoutableKey, AccordSafeCommandsForKey> 
commandsForKeys;
-    private final NavigableMap<RoutableKey, AccordSafeTimestampsForKey> 
timestampsForKeys;
+    private final NavigableMap<Key, AccordSafeCommandsForKey> commandsForKeys;
+    private final NavigableMap<Key, AccordSafeTimestampsForKey> 
timestampsForKeys;
     private final AccordCommandStore commandStore;
     private final RangesForEpoch ranges;
     CommandsForRanges.Updater rangeUpdates = null;
 
     public AccordSafeCommandStore(PreLoadContext context,
                                   Map<TxnId, AccordSafeCommand> commands,
-                                  NavigableMap<RoutableKey, 
AccordSafeTimestampsForKey> timestampsForKey,
-                                  NavigableMap<RoutableKey, 
AccordSafeCommandsForKey> commandsForKey,
+                                  NavigableMap<Key, 
AccordSafeTimestampsForKey> timestampsForKey,
+                                  NavigableMap<Key, AccordSafeCommandsForKey> 
commandsForKey,
                                   AccordCommandStore commandStore)
     {
         super(context);
@@ -94,7 +92,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    protected AccordSafeCommandsForKey getCommandsForKeyInternal(RoutableKey 
key)
+    protected AccordSafeCommandsForKey getCommandsForKeyInternal(Key key)
     {
         return commandsForKeys.get(key);
     }
@@ -106,7 +104,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(RoutableKey 
key)
+    protected AccordSafeCommandsForKey getCommandsForKeyIfLoaded(Key key)
     {
         AccordSafeCommandsForKey cfk = 
commandStore.commandsForKeyCache().acquireIfLoaded(key);
         if (cfk != null) cfk.preExecute();
@@ -114,7 +112,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    protected AccordSafeTimestampsForKey 
getTimestampsForKeyInternal(RoutableKey key)
+    protected AccordSafeTimestampsForKey getTimestampsForKeyInternal(Key key)
     {
         return timestampsForKeys.get(key);
     }
@@ -126,7 +124,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    protected AccordSafeTimestampsForKey 
getTimestampsForKeyIfLoaded(RoutableKey key)
+    protected AccordSafeTimestampsForKey getTimestampsForKeyIfLoaded(Key key)
     {
         AccordSafeTimestampsForKey cfk = 
commandStore.timestampsForKeyCache().acquireIfLoaded(key);
         if (cfk != null) cfk.preExecute();
@@ -177,14 +175,14 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         Ranges allRanges = ranges.all();
         deps.keyDeps.keys().forEach(allRanges, key -> {
             // TODO (now): batch register to minimise GC
-            deps.keyDeps.forEach(key, txnId -> {
+            deps.keyDeps.forEach(key, (txnId, txnIdx) -> {
                 // TODO (desired, efficiency): this can be made more efficient 
by batching by epoch
                 if (ranges.coordinates(txnId).contains(key))
                     return; // already coordinates, no need to replicate
                 if (!ranges.allBefore(txnId.epoch()).contains(key))
                     return;
 
-                CommandsForKeys.registerNotWitnessed(this, key, txnId);
+                get(key).registerHistorical(this, txnId);
             });
         });
         CommandsForRanges commandsForRanges = commandStore.commandsForRanges();
@@ -221,7 +219,7 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
                 for (Key key : keys)
                 {
                     if (!slice.contains(key)) continue;
-                    CommandsForKey commands = commandsForKey(key).current();
+                    CommandsForKey commands = get(key).current();
                     accumulate = map.apply(commands, accumulate);
                 }
             }
@@ -233,11 +231,11 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
                 Routables<?> sliced = keysOrRanges.slice(slice, 
Routables.Slice.Minimal);
                 if (!context.keys().slice(slice, 
Routables.Slice.Minimal).containsAll(sliced))
                     throw new AssertionError("Range(s) detected not present in 
the PreLoadContext: expected " + context.keys() + " but given " + keysOrRanges);
-                for (RoutableKey key : timestampsForKeys.keySet())
+                for (Key key : commandsForKeys.keySet())
                 {
                     //TODO (duplicate code): this is a repeat of Key... only 
change is checking contains in range
                     if (!sliced.contains(key)) continue;
-                    CommandsForKey commands = commandsForKey(key).current();
+                    CommandsForKey commands = get(key).current();
                     accumulate = map.apply(commands, accumulate);
                 }
             }
@@ -263,22 +261,20 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     @Override
-    protected void update(Command prev, Command updated, @Nullable 
Seekables<?, ?> keysOrRanges)
+    protected void update(Command prev, Command updated)
     {
-        super.update(prev, updated, keysOrRanges);
+        super.update(prev, updated);
 
         if (updated.txnId().domain() == Range && 
CommandsForKey.needsUpdate(prev, updated))
         {
+            Seekables<?, ?> keysOrRanges = updated.keysOrRanges();
+            if (keysOrRanges == null) keysOrRanges = prev.keysOrRanges();
             if (keysOrRanges == null)
-            {
-                if (updated.known().isDefinitionKnown()) keysOrRanges = 
updated.partialTxn().keys();
-                else if (prev.known().isDefinitionKnown()) keysOrRanges = 
prev.partialTxn().keys();
-                else return;
-            }
-            List<TxnId> waitingOn;
+                return;
 
-            if (updated.partialDeps() == null) waitingOn = 
Collections.emptyList();
+            List<TxnId> waitingOn;
             // TODO (required): this is faulty: we cannot simply save the raw 
transaction ids, as they may be for other ranges
+            if (updated.partialDeps() == null) waitingOn = 
Collections.emptyList();
             else waitingOn = updated.partialDeps().txnIds();
             updateRanges().put(updated.txnId(), (Ranges)keysOrRanges, 
updated.saveStatus(), updated.executeAt(), waitingOn);
         }
@@ -300,8 +296,8 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
     }
 
     public void postExecute(Map<TxnId, AccordSafeCommand> commands,
-                            Map<RoutableKey, AccordSafeTimestampsForKey> 
timestampsForKey,
-                            Map<RoutableKey, AccordSafeCommandsForKey> 
commandsForKeys
+                            Map<Key, AccordSafeTimestampsForKey> 
timestampsForKey,
+                            Map<Key, AccordSafeCommandsForKey> commandsForKeys
                             )
     {
         postExecute();
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
index 748143f333..808b4d4bc1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandsForKey.java
@@ -23,18 +23,17 @@ import java.util.Objects;
 import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.SafeCommandsForKey;
-import accord.primitives.RoutableKey;
+import accord.local.CommandsForKey;
+import accord.local.SafeCommandsForKey;
 
-public class AccordSafeCommandsForKey extends SafeCommandsForKey implements 
AccordSafeState<RoutableKey, CommandsForKey>
+public class AccordSafeCommandsForKey extends SafeCommandsForKey implements 
AccordSafeState<Key, CommandsForKey>
 {
     private boolean invalidated;
-    private final AccordCachingState<RoutableKey, CommandsForKey> global;
+    private final AccordCachingState<Key, CommandsForKey> global;
     private CommandsForKey original;
     private CommandsForKey current;
 
-    public AccordSafeCommandsForKey(AccordCachingState<RoutableKey, 
CommandsForKey> global)
+    public AccordSafeCommandsForKey(AccordCachingState<Key, CommandsForKey> 
global)
     {
         super((Key) global.key());
         this.global = global;
@@ -83,7 +82,7 @@ public class AccordSafeCommandsForKey extends 
SafeCommandsForKey implements Acco
     }
 
     @Override
-    public AccordCachingState<RoutableKey, CommandsForKey> global()
+    public AccordCachingState<Key, CommandsForKey> global()
     {
         checkNotInvalidated();
         return global;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
index b742efb9d1..374968bcfb 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeState.java
@@ -25,6 +25,8 @@ public interface AccordSafeState<K, V> extends SafeState<V>
 {
     void set(V update);
     V original();
+    void invalidate();
+    boolean invalidated();
     void preExecute();
     void postExecute();
     AccordCachingState<K, V> global();
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
index b5b44a7703..a4c48c83e6 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordSafeTimestampsForKey.java
@@ -26,17 +26,16 @@ import com.google.common.annotations.VisibleForTesting;
 import accord.api.Key;
 import accord.impl.SafeTimestampsForKey;
 import accord.impl.TimestampsForKey;
-import accord.primitives.RoutableKey;
 import accord.primitives.Timestamp;
 
-public class AccordSafeTimestampsForKey extends SafeTimestampsForKey 
implements AccordSafeState<RoutableKey, TimestampsForKey>
+public class AccordSafeTimestampsForKey extends SafeTimestampsForKey 
implements AccordSafeState<Key, TimestampsForKey>
 {
     private boolean invalidated;
-    private final AccordCachingState<RoutableKey, TimestampsForKey> global;
+    private final AccordCachingState<Key, TimestampsForKey> global;
     private TimestampsForKey original;
     private TimestampsForKey current;
 
-    public AccordSafeTimestampsForKey(AccordCachingState<RoutableKey, 
TimestampsForKey> global)
+    public AccordSafeTimestampsForKey(AccordCachingState<Key, 
TimestampsForKey> global)
     {
         super((Key) global.key());
         this.global = global;
@@ -71,7 +70,7 @@ public class AccordSafeTimestampsForKey extends 
SafeTimestampsForKey implements
     }
 
     @Override
-    public AccordCachingState<RoutableKey, TimestampsForKey> global()
+    public AccordCachingState<Key, TimestampsForKey> global()
     {
         checkNotInvalidated();
         return global;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 2f790e80b7..290541adec 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -123,7 +123,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     private final AccordDataStore dataStore;
     private final AccordJournal journal;
     private final CoordinateDurabilityScheduling durabilityScheduling;
-    private final AccordVerbHandler<? extends Request> verbHandler;
+    private final AccordVerbHandler<? extends Request> requestHandler;
     private final LocalConfig configuration;
     @GuardedBy("this")
     private State state = State.INIT;
@@ -292,7 +292,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                              configuration);
         this.nodeShutdown = toShutdownable(node);
         this.durabilityScheduling = new CoordinateDurabilityScheduling(node);
-        this.verbHandler = new AccordVerbHandler<>(node, configService, 
journal);
+        this.requestHandler = new AccordVerbHandler<>(node, configService, 
journal);
     }
 
     @Override
@@ -309,14 +309,14 @@ public class AccordService implements IAccordService, 
Shutdownable
         
durabilityScheduling.setShardCycleTime(Ints.checkedCast(DatabaseDescriptor.getAccordShardDurabilityCycle(SECONDS)),
 SECONDS);
         
durabilityScheduling.setTxnIdLag(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityTxnIdLag(SECONDS)),
 TimeUnit.SECONDS);
         
durabilityScheduling.setFrequency(Ints.checkedCast(DatabaseDescriptor.getAccordScheduleDurabilityFrequency(SECONDS)),
 SECONDS);
-        durabilityScheduling.start();
+//        durabilityScheduling.start();
         state = State.STARTED;
     }
 
     @Override
     public IVerbHandler<? extends Request> verbHandler()
     {
-        return verbHandler;
+        return requestHandler;
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java 
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
index acf342bf25..426ab4061b 100644
--- a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableSortedMap;
 
 import accord.api.Key;
 import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.impl.CommandsSummary;
 import accord.local.Command;
 import accord.local.SaveStatus;
@@ -64,7 +64,6 @@ import org.apache.cassandra.utils.IntervalTree;
 import static accord.local.SafeCommandStore.*;
 import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH;
-import static accord.local.SafeCommandStore.TestStartedAt.ANY;
 import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
 import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
 import static accord.local.Status.Stable;
@@ -372,11 +371,13 @@ public class CommandsForRanges
 
     private static Range toRange(Interval<RoutableKey, RangeCommandSummary> 
interval)
     {
-        TokenKey start = (TokenKey) interval.min;
-        TokenKey end = (TokenKey) interval.max;
+        AccordRoutingKey start = (AccordRoutingKey) interval.min;
+        if (!(start instanceof AccordRoutingKey.SentinelKey))
+            start = new TokenKey(start.table(), 
start.token().decreaseSlightly());
+        AccordRoutingKey end = (AccordRoutingKey) interval.max;
         // TODO (required, correctness) : accord doesn't support wrap around, 
so decreaseSlightly may fail in some cases
         // TODO (required, correctness) : this logic is mostly used for 
testing, so is it actually safe for all partitioners?
-        return new 
TokenRange(start.withToken(start.token().decreaseSlightly()), end);
+        return new TokenRange(start, end);
     }
 
     @Nullable
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java 
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index f9ab58387d..33f8f2b088 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -117,6 +117,9 @@ public class AccordAgent implements Agent
     public boolean isExpired(TxnId initiated, long now)
     {
         // TODO: should distinguish between reads and writes
+        if (initiated.kind().isSyncPoint())
+            return false;
+
         return now - initiated.hlc() > getReadRpcTimeout(MICROSECONDS);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index b8494e7d44..4b7607a36b 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.local.KeyHistory;
 import accord.local.PreLoadContext;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
 import accord.primitives.Seekables;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
@@ -121,7 +121,7 @@ public class AsyncLoader
         }
     }
 
-    private void referenceAndAssembleReadsForKey(RoutableKey key,
+    private void referenceAndAssembleReadsForKey(Key key,
                                                  AsyncOperation.Context 
context,
                                                  List<AsyncChain<?>> 
listenChains)
     {
@@ -157,7 +157,7 @@ public class AsyncLoader
         {
             case Key:
                 // cast to Keys fails...
-                Iterable<RoutableKey> keys = (Iterable<RoutableKey>) 
keysOrRanges;
+                Iterable<Key> keys = (Iterable<Key>) keysOrRanges;
                 keys.forEach(key -> referenceAndAssembleReadsForKey(key, 
context, chains));
                 break;
             case Range:
@@ -172,7 +172,7 @@ public class AsyncLoader
 
     private AsyncChain<?> 
referenceAndDispatchReadsForRange(AsyncOperation.Context context)
     {
-        AsyncChain<Set<? extends RoutableKey>> overlappingKeys = 
findOverlappingKeys((Ranges) keysOrRanges);
+        AsyncChain<Set<? extends Key>> overlappingKeys = 
findOverlappingKeys((Ranges) keysOrRanges);
 
         return overlappingKeys.flatMap(keys -> {
             if (keys.isEmpty())
@@ -183,14 +183,14 @@ public class AsyncLoader
         }, commandStore);
     }
 
-    private AsyncChain<Set<? extends RoutableKey>> findOverlappingKeys(Ranges 
ranges)
+    private AsyncChain<Set<? extends Key>> findOverlappingKeys(Ranges ranges)
     {
         Invariants.checkArgument(!ranges.isEmpty());
 
         List<AsyncChain<Set<PartitionKey>>> chains = new 
ArrayList<>(ranges.size());
         for (Range range : ranges)
             chains.add(findOverlappingKeys(range));
-        return AsyncChains.reduce(chains, (a, b) -> 
ImmutableSet.<RoutableKey>builder().addAll(a).addAll(b).build());
+        return AsyncChains.reduce(chains, (a, b) -> 
ImmutableSet.<Key>builder().addAll(a).addAll(b).build());
     }
 
     private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range)
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index bef57bcf0b..f0c53e33d7 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import accord.api.Key;
 import accord.local.CommandStore;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
@@ -67,8 +68,8 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
     static class Context
     {
         final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
-        final TreeMap<RoutableKey, AccordSafeTimestampsForKey> 
timestampsForKey = new TreeMap<>();
-        final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKey = 
new TreeMap<>();
+        final TreeMap<Key, AccordSafeTimestampsForKey> timestampsForKey = new 
TreeMap<>();
+        final TreeMap<Key, AccordSafeCommandsForKey> commandsForKey = new 
TreeMap<>();
 
         void releaseResources(AccordCommandStore commandStore)
         {
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 7294dd2696..49c06e811c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -132,7 +132,6 @@ public class AccordInteropApply extends Apply implements 
Command.TransientListen
             case PreCommitted:
             case Committed:
             case PreApplied:
-            case ReadyToExecute:
                 synchronized (this)
                 {
                     waitingOn.set(safeStore.commandStore().id());
@@ -249,7 +248,6 @@ public class AccordInteropApply extends Apply implements 
Command.TransientListen
             case PreCommitted:
             case Committed:
             case PreApplied:
-            case ReadyToExecute:
                 return;
 
             case Applied:
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index d5144cbe8d..dbe2f4845f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -24,10 +24,10 @@ import java.util.Arrays;
 import com.google.common.primitives.Ints;
 
 import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.Info;
-import accord.impl.CommandsForKey.InternalStatus;
-import accord.impl.CommandsForKey.NoInfo;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.TxnInfo;
+import accord.local.CommandsForKey.InternalStatus;
+import accord.local.CommandsForKey.Unmanaged;
 import accord.local.Node;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Timestamp;
@@ -35,8 +35,10 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
+import static accord.local.CommandsForKey.NO_PENDING_UNMANAGED;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.primitives.Txn.Kind.Read;
 import static accord.primitives.Txn.Kind.Write;
@@ -104,23 +106,7 @@ public class CommandsForKeySerializer
     {
         int commandCount = cfk.size();
         if (commandCount == 0)
-        {
-            // TODO (expected): we should not need to special-case, but best 
solution here is not to store redundantBefore;
-            //    but this requires some modest deeper changes, so for now 
special-case serialization when empty
-            Timestamp redundantBefore = cfk.redundantBefore();
-            ByteBuffer out = 
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(0) +
-                                                 
TypeSizes.sizeofUnsignedVInt(redundantBefore.epoch()) +
-                                                 
TypeSizes.sizeofUnsignedVInt(redundantBefore.hlc()) +
-                                                 
TypeSizes.sizeofUnsignedVInt(redundantBefore.flags()) +
-                                                 
TypeSizes.sizeofUnsignedVInt(redundantBefore.node.id));
-            VIntCoding.writeUnsignedVInt32(0, out);
-            VIntCoding.writeUnsignedVInt(redundantBefore.epoch(), out);
-            VIntCoding.writeUnsignedVInt(redundantBefore.hlc(), out);
-            VIntCoding.writeUnsignedVInt32(redundantBefore.flags(), out);
-            VIntCoding.writeUnsignedVInt32(redundantBefore.node.id, out);
-            out.flip();
-            return out;
-        }
+            return ByteBuffer.allocate(1);
 
         int[] nodeIds = cachedInts().getInts(Math.min(64, commandCount));
         try
@@ -129,11 +115,9 @@ public class CommandsForKeySerializer
             // whether we have any missing transactions to encode, any 
executeAt that are not equal to their TxnId
             // and whether there are any non-standard flag bits to encode
             boolean hasNonStandardFlags = false;
-            int nodeIdCount, missingIdCount = 0, executeAtCount = 0, 
bitsPerExecuteAtFlags = 0;
+            int nodeIdCount = 0, missingIdCount = 0, executeAtCount = 0, 
bitsPerExecuteAtFlags = 0;
             int bitsPerExecuteAtEpochDelta = 0, bitsPerExecuteAtHlcDelta = 1; 
// to permit us to use full 64 bits and encode in 5 bits we force at least one 
hlc bit
             {
-                nodeIdCount = 1;
-                nodeIds[0] = cfk.redundantBefore().node.id;
                 for (int i = 0 ; i < commandCount ; ++i)
                 {
                     if (nodeIdCount + 1 >= nodeIds.length)
@@ -143,24 +127,19 @@ public class CommandsForKeySerializer
                             nodeIds = cachedInts().resize(nodeIds, 
nodeIds.length, nodeIds.length * 2);
                     }
 
-                    TxnId txnId = cfk.txnId(i);
-                    Info info = cfk.info(i);
+                    TxnInfo txn = cfk.get(i);
 
-                    hasNonStandardFlags |= txnIdFlags(txnId) != STANDARD;
-                    nodeIds[nodeIdCount++] = txnId.node.id;
+                    hasNonStandardFlags |= txnIdFlags(txn) != STANDARD;
+                    nodeIds[nodeIdCount++] = txn.node.id;
 
-                    if (info.getClass() == NoInfo.class)
+                    missingIdCount += txn.missing().length;
+                    if (txn.executeAt == txn)
                         continue;
 
-                    missingIdCount += info.missing.length;
-
-                    if (info.executeAt == txnId)
-                        continue;
-
-                    nodeIds[nodeIdCount++] = info.executeAt.node.id;
-                    bitsPerExecuteAtEpochDelta = 
Math.max(bitsPerExecuteAtEpochDelta, 
numberOfBitsToRepresent(info.executeAt.epoch() - txnId.epoch()));
-                    bitsPerExecuteAtHlcDelta = 
Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(info.executeAt.hlc() 
- txnId.hlc()));
-                    bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, 
numberOfBitsToRepresent(info.executeAt.flags()));
+                    nodeIds[nodeIdCount++] = txn.executeAt.node.id;
+                    bitsPerExecuteAtEpochDelta = 
Math.max(bitsPerExecuteAtEpochDelta, 
numberOfBitsToRepresent(txn.executeAt.epoch() - txn.epoch()));
+                    bitsPerExecuteAtHlcDelta = 
Math.max(bitsPerExecuteAtHlcDelta, numberOfBitsToRepresent(txn.executeAt.hlc() 
- txn.hlc()));
+                    bitsPerExecuteAtFlags = Math.max(bitsPerExecuteAtFlags, 
numberOfBitsToRepresent(txn.executeAt.flags()));
                     executeAtCount += 1;
                 }
                 nodeIdCount = compact(nodeIds);
@@ -175,8 +154,8 @@ public class CommandsForKeySerializer
             int maxHeaderBits = minHeaderBits;
             int totalBytes = 0;
 
-            long prevEpoch = cfk.redundantBefore().epoch();
-            long prevHlc = cfk.redundantBefore().hlc();
+            long prevEpoch = cfk.get(0).epoch();
+            long prevHlc = cfk.get(0).hlc();
             int[] bytesHistogram = cachedInts().getInts(12);
             Arrays.fill(bytesHistogram, 0);
             for (int i = 0 ; i < commandCount ; ++i)
@@ -214,7 +193,7 @@ public class CommandsForKeySerializer
                 if (hasNonStandardFlags && txnIdFlags(txnId) == RAW)
                     totalBytes += 2;
 
-                Info info = cfk.info(i);
+                TxnInfo info = cfk.get(i);
                 if (info.status.hasInfo)
                     headerBits += infoHeaderBits;
                 maxHeaderBits = Math.max(headerBits, maxHeaderBits);
@@ -247,9 +226,11 @@ public class CommandsForKeySerializer
                 // then pick third number as 75th %ile, but at least 1 less 
than highest, and one more than second
                 // finally, ensure third then second are distributed so that 
there is no more than a gap of 4 between them and the next
                 int l0 = Math.max(0, Math.min(3, minBasicBytes - headerBytes));
-                int l1 = Math.max(l0+1, 
Math.min(l0+4,Arrays.binarySearch(bytesHistogram, commandCount/4) - 
headerBytes));
+                int l1 = Arrays.binarySearch(bytesHistogram, minBasicBytes, 
maxBasicBytes, commandCount/4);
+                l1 = Math.max(l0+1, Math.min(l0+4, (l1 < 0 ? -1 - l1 : l1) - 
headerBytes));
                 int l3 = Math.max(l1+2, maxBasicBytes - headerBytes);
-                int l2 = Math.max(l1+1, Math.min(l3-1, 
Arrays.binarySearch(bytesHistogram, (3*commandCount)/4) - headerBytes));
+                int l2 = Arrays.binarySearch(bytesHistogram, minBasicBytes, 
maxBasicBytes,(3*commandCount)/4);
+                l2 = Math.max(l1+1, Math.min(l3-1, (l2 < 0 ? -1 -l2 : l2) - 
headerBytes));
                 while (l3-l2 > 4) ++l2;
                 while (l2-l1 > 4) ++l1;
                 hlcBytesLookup = setHlcBytes(l0, l1, l2, l3);
@@ -267,16 +248,13 @@ public class CommandsForKeySerializer
                 totalBytes += TypeSizes.sizeofUnsignedVInt(nodeIds[i] - 
nodeIds[i-1]);
             totalBytes += 2;
 
-            Arrays.fill(bytesHistogram, minBasicBytes, maxBasicBytes + 1, 0);
             cachedInts().forceDiscard(bytesHistogram);
 
-            prevEpoch = cfk.redundantBefore().epoch();
-            prevHlc = cfk.redundantBefore().hlc();
+            prevEpoch = cfk.get(0).epoch();
+            prevHlc = cfk.get(0).hlc();
             // account for encoding redundantBefore
             totalBytes += TypeSizes.sizeofUnsignedVInt(prevEpoch);
             totalBytes += TypeSizes.sizeofUnsignedVInt(prevHlc);
-            totalBytes += 2; // flags TODO (expected): pack this along with 
uniqueIdBits, as usually zero bits should be needed
-            totalBytes += (bitsPerNodeId+7)/8;
 
             if (missingIdCount + executeAtCount > 0)
             {
@@ -291,6 +269,20 @@ public class CommandsForKeySerializer
                     totalBytes += 2;
             }
 
+            // count unmanaged bytes
+            int unmanagedPendingCommitCount = 0;
+            for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+            {
+                Unmanaged unmanaged = cfk.getUnmanaged(i);
+                if (unmanaged.pending == Unmanaged.Pending.COMMIT)
+                    ++unmanagedPendingCommitCount;
+                totalBytes += CommandSerializers.txnId.serializedSize();
+                // TODO (desired): this could be more efficient, e.g. 
referencing one of the TxnInfo indexes for timestamp
+                totalBytes += CommandSerializers.timestamp.serializedSize();
+            }
+            totalBytes += 
TypeSizes.sizeofUnsignedVInt(unmanagedPendingCommitCount);
+            totalBytes += TypeSizes.sizeofUnsignedVInt(cfk.unmanagedCount() - 
unmanagedPendingCommitCount);
+
             ByteBuffer out = ByteBuffer.allocate(totalBytes);
             VIntCoding.writeUnsignedVInt32(commandCount, out);
             VIntCoding.writeUnsignedVInt32(nodeIdCount, out);
@@ -301,8 +293,6 @@ public class CommandsForKeySerializer
 
             VIntCoding.writeUnsignedVInt(prevEpoch, out);
             VIntCoding.writeUnsignedVInt(prevHlc, out);
-            out.putShort((short) cfk.redundantBefore().flags());
-            writeLeastSignificantBytes(Arrays.binarySearch(nodeIds, 0, 
nodeIdCount, cfk.redundantBefore().node.id), (bitsPerNodeId+7)/8, out);
 
             int executeAtMask = executeAtCount > 0 ? 1 : 0;
             int missingDepsMask = missingIdCount > 0 ? 1 : 0;
@@ -311,7 +301,7 @@ public class CommandsForKeySerializer
             for (int i = 0 ; i < commandCount ; ++i)
             {
                 TxnId txnId = cfk.txnId(i);
-                Info info = cfk.info(i);
+                TxnInfo info = cfk.get(i);
                 InternalStatus status = info.status;
 
                 long bits = status.ordinal();
@@ -322,7 +312,7 @@ public class CommandsForKeySerializer
                 bits |= hasExecuteAt << bitIndex;
                 bitIndex += statusHasInfo & executeAtMask;
 
-                long hasMissingIds = info.missing != CommandsForKey.NO_TXNIDS 
? 1 : 0;
+                long hasMissingIds = info.missing() != 
CommandsForKey.NO_TXNIDS ? 1 : 0;
                 bits |= hasMissingIds << bitIndex;
                 bitIndex += statusHasInfo & missingDepsMask;
 
@@ -392,6 +382,20 @@ public class CommandsForKeySerializer
                 }
             }
 
+            VIntCoding.writeUnsignedVInt32(unmanagedPendingCommitCount, out);
+            VIntCoding.writeUnsignedVInt32(cfk.unmanagedCount() - 
unmanagedPendingCommitCount, out);
+            Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? 
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+            for (int i = 0 ; i < cfk.unmanagedCount() ; ++i)
+            {
+                Unmanaged unmanaged = cfk.getUnmanaged(i);
+                Invariants.checkState(unmanaged.pending == pending);
+                CommandSerializers.txnId.serialize(unmanaged.txnId, out, 
ByteBufferAccessor.instance, out.position());
+                out.position(out.position() + 
CommandSerializers.txnId.serializedSize());
+                CommandSerializers.timestamp.serialize(unmanaged.waitingUntil, 
out, ByteBufferAccessor.instance, out.position());
+                out.position(out.position() + 
CommandSerializers.timestamp.serializedSize());
+                if (--unmanagedPendingCommitCount == 0) pending = 
Unmanaged.Pending.APPLY;
+            }
+
             if ((executeAtCount | missingIdCount) > 0)
             {
                 int bitsPerCommandId =  numberOfBitsToRepresent(commandCount);
@@ -407,21 +411,17 @@ public class CommandsForKeySerializer
 
                 for (int i = 0 ; i < commandCount ; ++i)
                 {
-                    Info info = cfk.info(i);
-                    if (info.getClass() == NoInfo.class)
-                        continue;
-
-                    TxnId txnId = cfk.txnId(i);
-                    if (info.executeAt != txnId)
+                    TxnInfo txn = cfk.get(i);
+                    if (txn.executeAt != txn)
                     {
-                        Timestamp executeAt = info.executeAt;
+                        Timestamp executeAt = txn.executeAt;
                         int nodeIdx = Arrays.binarySearch(nodeIds, 0, 
nodeIdCount, executeAt.node.id);
                         if (bitsPerExecuteAt <= 64)
                         {
-                            Invariants.checkState(executeAt.epoch() >= 
txnId.epoch());
-                            long executeAtBits = executeAt.epoch() - 
txnId.epoch();
+                            Invariants.checkState(executeAt.epoch() >= 
txn.epoch());
+                            long executeAtBits = executeAt.epoch() - 
txn.epoch();
                             int offset = bitsPerExecuteAtEpochDelta;
-                            executeAtBits |= (executeAt.hlc() - txnId.hlc()) 
<< offset ;
+                            executeAtBits |= (executeAt.hlc() - txn.hlc()) << 
offset ;
                             offset += bitsPerExecuteAtHlcDelta;
                             executeAtBits |= ((long)executeAt.flags()) << 
offset;
                             offset += bitsPerExecuteAtFlags;
@@ -431,9 +431,9 @@ public class CommandsForKeySerializer
                         }
                         else
                         {
-                            buffer = flushBits(buffer, bufferCount, 
executeAt.epoch() - txnId.epoch(), bitsPerExecuteAtEpochDelta, out);
+                            buffer = flushBits(buffer, bufferCount, 
executeAt.epoch() - txn.epoch(), bitsPerExecuteAtEpochDelta, out);
                             bufferCount = (bufferCount + 
bitsPerExecuteAtEpochDelta) & 63;
-                            buffer = flushBits(buffer, bufferCount, 
executeAt.hlc() - txnId.hlc(), bitsPerExecuteAtHlcDelta, out);
+                            buffer = flushBits(buffer, bufferCount, 
executeAt.hlc() - txn.hlc(), bitsPerExecuteAtHlcDelta, out);
                             bufferCount = (bufferCount + 
bitsPerExecuteAtHlcDelta) & 63;
                             buffer = flushBits(buffer, bufferCount, 
executeAt.flags(), bitsPerExecuteAtFlags, out);
                             bufferCount = (bufferCount + 
bitsPerExecuteAtFlags) & 63;
@@ -442,16 +442,17 @@ public class CommandsForKeySerializer
                         }
                     }
 
-                    if (info.missing.length > 0)
+                    TxnId[] missing = txn.missing();
+                    if (missing.length > 0)
                     {
                         int j = 0;
-                        while (j < info.missing.length - 1)
+                        while (j < missing.length - 1)
                         {
-                            int missingId = cfk.indexOf(info.missing[j++]);
+                            int missingId = cfk.indexOf(missing[j++]);
                             buffer = flushBits(buffer, bufferCount, missingId, 
bitsPerMissingId, out);
                             bufferCount = (bufferCount + bitsPerMissingId) & 
63;
                         }
-                        int missingId = 
cfk.indexOf(info.missing[info.missing.length - 1]);
+                        int missingId = cfk.indexOf(missing[missing.length - 
1]);
                         missingId |= 1L << bitsPerCommandId;
                         buffer = flushBits(buffer, bufferCount, missingId, 
bitsPerMissingId, out);
                         bufferCount = (bufferCount + bitsPerMissingId) & 63;
@@ -461,6 +462,7 @@ public class CommandsForKeySerializer
                 writeMostSignificantBytes(buffer, (bufferCount + 7)/8, out);
             }
 
+            Invariants.checkState(!out.hasRemaining());
             out.flip();
             return out;
         }
@@ -494,16 +496,10 @@ public class CommandsForKeySerializer
         in = in.duplicate();
         int commandCount = VIntCoding.readUnsignedVInt32(in);
         if (commandCount == 0)
-        {
-            long epoch = VIntCoding.readUnsignedVInt(in);
-            long hlc = VIntCoding.readUnsignedVInt(in);
-            int flags = VIntCoding.readUnsignedVInt32(in);
-            Node.Id id = new Node.Id(VIntCoding.readUnsignedVInt32(in));
-            return new 
CommandsForKey(key).withoutRedundant(TxnId.fromValues(epoch, hlc, flags, id));
-        }
+            return new CommandsForKey(key);
 
-        TxnId[] txnIds = new TxnId[commandCount];
-        Info[] infos = new Info[commandCount];
+        TxnId[] txnIds = cachedTxnIds().get(commandCount);
+        TxnInfo[] txns = new TxnInfo[commandCount];
         int nodeIdCount = VIntCoding.readUnsignedVInt32(in);
         int bitsPerNodeId = numberOfBitsToRepresent(nodeIdCount);
         long nodeIdMask = (1L << bitsPerNodeId) - 1;
@@ -526,12 +522,8 @@ public class CommandsForKeySerializer
             hlcBytesLookup = setHlcByteDeltas((flags >>> 5) & 0x3, (flags >>> 
7) & 0x3, (flags >>> 9) & 0x3, (flags >>> 11) & 0x3);
         }
 
-        long prevEpoch = VIntCoding.readUnsignedVInt32(in);
-        long prevHlc = VIntCoding.readUnsignedVInt32(in);
-        TxnId redundantBefore = TxnId.fromValues(prevEpoch, prevHlc, 
in.getShort(),
-                                                 
nodeIds[(int)readLeastSignificantBytes((bitsPerNodeId+7)/8, in)]);
-
-
+        long prevEpoch = VIntCoding.readUnsignedVInt(in);
+        long prevHlc = VIntCoding.readUnsignedVInt(in);
         for (int i = 0 ; i < commandCount ; ++i)
         {
             long header = readLeastSignificantBytes(headerByteCount, in);
@@ -601,12 +593,34 @@ public class CommandsForKeySerializer
                                        : TxnId.fromValues(epoch, hlc, flags, 
node);
 
             txnIds[i] = txnId;
-            infos[i] = DECODE_INFOS[(executeAtInfoOffset | 
missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()];
+            txns[i] = DECODE_INFOS[(executeAtInfoOffset | 
missingDepsInfoOffset)*STATUS_COUNT + status.ordinal()];
 
             prevEpoch = epoch;
             prevHlc = hlc;
         }
 
+        int unmanagedPendingCommitCount = VIntCoding.readUnsignedVInt32(in);
+        int unmanagedCount = unmanagedPendingCommitCount + 
VIntCoding.readUnsignedVInt32(in);
+        Unmanaged[] unmanageds;
+        if (unmanagedCount == 0)
+        {
+            unmanageds = NO_PENDING_UNMANAGED;
+        }
+        else
+        {
+            unmanageds = new Unmanaged[unmanagedCount];
+            Unmanaged.Pending pending = unmanagedPendingCommitCount == 0 ? 
Unmanaged.Pending.APPLY : Unmanaged.Pending.COMMIT;
+            for (int i = 0 ; i < unmanagedCount ; ++i)
+            {
+                TxnId txnId = CommandSerializers.txnId.deserialize(in, 
ByteBufferAccessor.instance, in.position());
+                in.position(in.position() + 
CommandSerializers.txnId.serializedSize());
+                Timestamp waitingUntil = 
CommandSerializers.timestamp.deserialize(in, ByteBufferAccessor.instance, 
in.position());
+                in.position(in.position() + 
CommandSerializers.timestamp.serializedSize());
+                unmanageds[i] = new Unmanaged(pending, txnId, waitingUntil);
+                if (--unmanagedPendingCommitCount == 0) pending = 
Unmanaged.Pending.APPLY;
+            }
+        }
+
         if (executeAtMasks + missingDepsMasks > 0)
         {
             TxnId[] missingIdBuffer = cachedTxnIds().get(8);
@@ -630,13 +644,10 @@ public class CommandsForKeySerializer
 
             for (int i = 0 ; i < commandCount ; ++i)
             {
-                Info info = infos[i];
-                if (info.getClass() == NoInfo.class)
-                    continue;
-
                 TxnId txnId = txnIds[i];
-                Timestamp executeAt = txnId;
-                if (info.executeAt == null)
+                TxnInfo placeholder = txns[i];
+                Timestamp executeAt;
+                if (placeholder.executeAt == null)
                 {
                     long epoch, hlc;
                     int flags;
@@ -661,8 +672,12 @@ public class CommandsForKeySerializer
                     }
                     executeAt = Timestamp.fromValues(epoch, hlc, flags, id);
                 }
+                else
+                {
+                    executeAt = txnId;
+                }
 
-                TxnId[] missing = info.missing;
+                TxnId[] missing = placeholder.missing();
                 if (missing == null)
                 {
                     int prev = -1;
@@ -684,13 +699,19 @@ public class CommandsForKeySerializer
                     missingIdCount = 0;
                 }
 
-                infos[i] = Info.create(txnId, info.status, executeAt, missing);
+                txns[i] = TxnInfo.create(txnId, placeholder.status, executeAt, 
missing);
             }
 
             cachedTxnIds().forceDiscard(missingIdBuffer, maxIdBufferCount);
         }
+        else
+        {
+            for (int i = 0 ; i < commandCount ; ++i)
+                txns[i] = TxnInfo.create(txnIds[i], txns[i].status, txnIds[i]);
+        }
+        cachedTxnIds().forceDiscard(txnIds, commandCount);
 
-        return CommandsForKey.SerializerSupport.create(key, redundantBefore, 
txnIds, infos);
+        return CommandsForKey.SerializerSupport.create(key, txns, unmanageds);
     }
 
     private static int getHlcBytes(int lookup, int index)
@@ -833,16 +854,16 @@ public class CommandsForKeySerializer
 
     private static final Txn.Kind[] TXN_ID_FLAG_BITS_KIND_LOOKUP = new 
Txn.Kind[] { Read, Write, ExclusiveSyncPoint, null };
     private static final int STATUS_COUNT = InternalStatus.values().length;
-    private static final Info[] DECODE_INFOS = new Info[4 * STATUS_COUNT];
+    private static final TxnInfo[] DECODE_INFOS = new TxnInfo[4 * 
STATUS_COUNT];
     static
     {
         for (InternalStatus status : InternalStatus.values())
         {
             int ordinal = status.ordinal();
-            DECODE_INFOS[ordinal] = status.asNoInfo;
-            DECODE_INFOS[STATUS_COUNT+ordinal] = Info.createMock(status, 
Timestamp.NONE, null);
-            DECODE_INFOS[2*STATUS_COUNT+ordinal] = Info.createMock(status, 
null, CommandsForKey.NO_TXNIDS);
-            DECODE_INFOS[3*STATUS_COUNT+ordinal] = Info.createMock(status, 
null, null);
+            DECODE_INFOS[ordinal] = TxnInfo.createMock(TxnId.NONE, status, 
TxnId.NONE, CommandsForKey.NO_TXNIDS);
+            DECODE_INFOS[STATUS_COUNT+ordinal] = 
TxnInfo.createMock(TxnId.NONE, status, TxnId.NONE, null);
+            DECODE_INFOS[2*STATUS_COUNT+ordinal] = 
TxnInfo.createMock(TxnId.NONE, status, null, CommandsForKey.NO_TXNIDS);
+            DECODE_INFOS[3*STATUS_COUNT+ordinal] = 
TxnInfo.createMock(TxnId.NONE, status, null, null);
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
index 930807d7f0..3efb9e2c6c 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/WaitingOnSerializer.java
@@ -22,40 +22,63 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import accord.local.Command.WaitingOn;
-import accord.primitives.Deps;
+import accord.primitives.Keys;
+import accord.primitives.Routable;
+import accord.primitives.TxnId;
 import accord.utils.ImmutableBitSet;
 import accord.utils.Invariants;
 import accord.utils.SimpleBitSet;
+import accord.utils.SortedArrays.SortedArrayList;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.vint.VIntCoding;
 
 public class WaitingOnSerializer
 {
-    public static void serialize(WaitingOn waitingOn, DataOutputPlus out) 
throws IOException
+    public static void serialize(TxnId txnId, WaitingOn waitingOn, 
DataOutputPlus out) throws IOException
     {
-        // TODO (expected): use run length encoding; we know that at most 
1/3rd of bits will be set between the three bitsets
-        int length = (waitingOn.deps.txnIdCount() + 63) / 64;
-        serialize(length, waitingOn.waitingOnCommit, out);
-        serialize(length, waitingOn.waitingOnApply, out);
-        serialize(length, waitingOn.appliedOrInvalidated, out);
+        out.writeUnsignedVInt32(waitingOn.keys.size());
+        out.writeUnsignedVInt32(waitingOn.txnIds.size());
+        int keyCount = waitingOn.keys.size();
+        int txnIdCount = waitingOn.txnIds.size();
+        int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+        serialize(waitingOnLength, waitingOn.waitingOn, out);
+        if (txnId.domain() == Routable.Domain.Range)
+        {
+            int appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+            serialize(appliedOrInvalidatedLength, 
waitingOn.appliedOrInvalidated, out);
+        }
     }
 
-    public static WaitingOn deserialize(Deps deps, DataInputPlus in) throws 
IOException
+    public static WaitingOn deserialize(TxnId txnId, Keys keys, 
SortedArrayList<TxnId> txnIds, DataInputPlus in) throws IOException
     {
-        int length = (deps.txnIdCount() + 63) / 64;
-        ImmutableBitSet waitingOnCommit = deserialize(length, in);
-        ImmutableBitSet waitingOnApply = deserialize(length, in);
-        ImmutableBitSet appliedOrInvalidated = deserialize(length, in);
-        return new WaitingOn(deps, waitingOnCommit, waitingOnApply, 
appliedOrInvalidated);
+        int a = in.readUnsignedVInt32();
+        int b = in.readUnsignedVInt32();
+        int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
+        ImmutableBitSet waitingOn = deserialize(waitingOnLength, in);
+        ImmutableBitSet appliedOrInvalidated = null;
+        if (txnId.domain() == Routable.Domain.Range)
+        {
+            int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
+            appliedOrInvalidated = deserialize(appliedOrInvalidatedLength, in);
+        }
+        return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
     }
 
     public static long serializedSize(WaitingOn waitingOn)
     {
-        int length = (waitingOn.deps.txnIdCount() + 63) / 64;
-        return serializedSize(length, waitingOn.waitingOnCommit)
-               + serializedSize(length, waitingOn.waitingOnApply)
-               + serializedSize(length, waitingOn.appliedOrInvalidated);
+        int keyCount = waitingOn.keys.size();
+        int txnIdCount = waitingOn.txnIds.size();
+        int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+        long size = serializedSize(waitingOnLength, waitingOn.waitingOn);
+        size += TypeSizes.sizeofUnsignedVInt(keyCount);
+        size += TypeSizes.sizeofUnsignedVInt(txnIdCount);
+        if (waitingOn.appliedOrInvalidated == null)
+            return size;
+
+        int appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+        return size + serializedSize(appliedOrInvalidatedLength, 
waitingOn.appliedOrInvalidated);
     }
 
     private static void serialize(int length, SimpleBitSet write, 
DataOutputPlus out) throws IOException
@@ -81,14 +104,23 @@ public class WaitingOnSerializer
         return (long) TypeSizes.LONG_SIZE * length;
     }
 
-    public static ByteBuffer serialize(WaitingOn waitingOn) throws IOException
+    public static ByteBuffer serialize(TxnId txnId, WaitingOn waitingOn) 
throws IOException
     {
-        int length = (waitingOn.deps.txnIdCount() + 63) / 64;
-        ByteBuffer out = ByteBuffer.allocate(TypeSizes.LONG_SIZE * length * 3);
-        serialize(length, waitingOn.waitingOnCommit, out);
-        serialize(length, waitingOn.waitingOnApply, out);
-        serialize(length, waitingOn.appliedOrInvalidated, out);
-        return (ByteBuffer) out.flip();
+        int keyCount = waitingOn.keys.size();
+        int txnIdCount = waitingOn.txnIds.size();
+        int waitingOnLength = (txnIdCount + keyCount + 63) / 64;
+        int appliedOrInvalidatedLength = 0;
+        if (txnId.domain() == Routable.Domain.Range)
+            appliedOrInvalidatedLength = (txnIdCount + 63) / 64;
+
+        ByteBuffer out = 
ByteBuffer.allocate(TypeSizes.sizeofUnsignedVInt(keyCount) + 
TypeSizes.sizeofUnsignedVInt(txnIdCount)
+                                             + TypeSizes.LONG_SIZE * 
(waitingOnLength + appliedOrInvalidatedLength));
+        VIntCoding.writeUnsignedVInt32(keyCount, out);
+        VIntCoding.writeUnsignedVInt32(txnIdCount, out);
+        serialize(waitingOnLength, waitingOn.waitingOn, out);
+        if (appliedOrInvalidatedLength > 0)
+            serialize(appliedOrInvalidatedLength, 
waitingOn.appliedOrInvalidated, out);
+        return out.flip();
     }
 
     private static void serialize(int length, SimpleBitSet write, ByteBuffer 
out)
@@ -99,16 +131,23 @@ public class WaitingOnSerializer
             out.putLong(bits[i]);
     }
 
-    public static WaitingOn deserialize(Deps deps, ByteBuffer in) throws 
IOException
+    public static WaitingOn deserialize(TxnId txnId, Keys keys, 
SortedArrayList<TxnId> txnIds, ByteBuffer in) throws IOException
     {
-        int length = (deps.txnIdCount() + 63) / 64;
+        int waitingOnLength = (txnIds.size() + keys.size() + 63) / 64;
         int position = in.position();
-        ImmutableBitSet waitingOnCommit = deserialize(position, length, in);
-        position += length*8;
-        ImmutableBitSet waitingOnApply = deserialize(position, length, in);
-        position += length*8;
-        ImmutableBitSet appliedOrInvalidated = deserialize(position, length, 
in);
-        return new WaitingOn(deps, waitingOnCommit, waitingOnApply, 
appliedOrInvalidated);
+        int a = VIntCoding.readUnsignedVInt32(in, position);
+        position += TypeSizes.sizeofUnsignedVInt(a);
+        int b = VIntCoding.readUnsignedVInt32(in, position);
+        position += TypeSizes.sizeofUnsignedVInt(a);
+        ImmutableBitSet waitingOn = deserialize(position, waitingOnLength, in);
+        ImmutableBitSet appliedOrInvalidated = null;
+        if (txnId.domain() == Routable.Domain.Range)
+        {
+            position += waitingOnLength*8;
+            int appliedOrInvalidatedLength = (txnIds.size() + 63) / 64;
+            appliedOrInvalidated = deserialize(position, 
appliedOrInvalidatedLength, in);
+        }
+        return new WaitingOn(keys, txnIds, waitingOn, appliedOrInvalidated);
     }
 
     private static ImmutableBitSet deserialize(int position, int length, 
ByteBuffer in)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index ec85d63f2c..5e3e9ff831 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -35,9 +35,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.DataStore;
+import accord.api.Key;
 import accord.api.Write;
 import accord.impl.AbstractSafeCommandStore;
-import accord.impl.CommandsForKeys;
+import accord.impl.TimestampsForKeys;
 import accord.impl.TimestampsForKey;
 import accord.local.SafeCommandStore;
 import accord.primitives.PartialTxn;
@@ -375,7 +376,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         // TODO (expected, efficiency): 99.9999% of the time we can just use 
executeAt.hlc(), so can avoid bringing
         //  cfk into memory by retaining at all times in memory key ranges 
that are dirty and must use this logic;
         //  any that aren't can just use executeAt.hlc
-        TimestampsForKey cfk = 
CommandsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>) 
safeStore, (RoutableKey) key, executeAt, true);
+        TimestampsForKey cfk = 
TimestampsForKeys.updateLastExecutionTimestamps((AbstractSafeCommandStore<?,?,?>)
 safeStore, (Key) key, executeAt, true);
         long timestamp = AccordSafeTimestampsForKey.timestampMicrosFor(cfk, 
executeAt, true);
         // TODO (low priority - do we need to compute nowInSeconds, or can we 
just use executeAt?)
         int nowInSeconds = AccordSafeTimestampsForKey.nowInSecondsFor(cfk, 
executeAt, true);
diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java 
b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
index 8b52bb41e4..dc873f0210 100644
--- a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
+++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java
@@ -126,13 +126,27 @@ public class VIntCoding
         return retval;
     }
 
+    @DontInline
+    private static long readUnsignedVIntSlow(ByteBuffer in, int position, byte 
firstByte)
+    {
+        int size = numberOfExtraBytesToRead(firstByte);
+        long retval = firstByte & firstByteValueMask(size);
+        for (int ii = 0; ii < size; ii++)
+        {
+            byte b = in.get(position++);
+            retval <<= 8;
+            retval |= b & 0xff;
+        }
+
+        return retval;
+    }
+
     public static long readUnsignedVInt(ByteBuffer in)
     {
         byte firstByte = in.get();
         if (firstByte >= 0)
             return firstByte;
 
-
         int position = in.position();
         int limit = in.limit();
         if (limit - position < 8)
@@ -155,6 +169,32 @@ public class VIntCoding
         return retval;
     }
 
+    public static long readUnsignedVInt(ByteBuffer in, int position)
+    {
+        byte firstByte = in.get(position++);
+        if (firstByte >= 0)
+            return firstByte;
+
+        int limit = in.limit();
+        if (limit - position < 8)
+            return readUnsignedVIntSlow(in, position, firstByte);
+
+        int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte);
+        int extraBits = extraBytes * 8;
+
+        long retval = in.getLong(position);
+        if (in.order() == ByteOrder.LITTLE_ENDIAN)
+            retval = Long.reverseBytes(retval);
+
+        // truncate the bytes we read in excess of those we needed
+        retval >>>= 64 - extraBits;
+        // remove the non-value bits from the first byte
+        firstByte &= VIntCoding.firstByteValueMask(extraBytes);
+        // shift the first byte up to its correct position
+        retval |= (long) firstByte << extraBits;
+        return retval;
+    }
+
     public static void skipUnsignedVInt(DataInputPlus input) throws IOException
     {
         int firstByte = input.readByte();
@@ -330,6 +370,11 @@ public class VIntCoding
         return checkedCast(readUnsignedVInt(input));
     }
 
+    public static int readUnsignedVInt32(ByteBuffer input, int position)
+    {
+        return checkedCast(readUnsignedVInt(input, position));
+    }
+
     // & this with the first byte to give the value part for a given 
extraBytesToRead encoded in the byte
     public static int firstByteValueMask(int extraBytesToRead)
     {
diff --git a/test/conf/logback-dtest-quiet.xml 
b/test/conf/logback-dtest-quiet.xml
new file mode 100644
index 0000000000..bb9f983177
--- /dev/null
+++ b/test/conf/logback-dtest-quiet.xml
@@ -0,0 +1,56 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+  <define name="cluster_id" 
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+  <define name="instance_id" 
class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+  <!-- Shutdown hook ensures that async appender flushes -->
+  <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+  <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+    
<file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+    <encoder>
+      <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>INFO</level>
+    </filter>
+    <immediateFlush>true</immediateFlush>
+  </appender>
+
+  <appender name="INSTANCESTDERR" target="System.err" 
class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>ERROR</level>
+    </filter>
+  </appender>
+
+  <!-- Un-comment to enable TRACE logging for Accord transactions.
+  <logger name="accord" level="TRACE" />
+  <logger name="org.apache.cassandra.service.accord" level="TRACE" />
+  -->
+
+  <root level="DEBUG">
+    <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race 
conditions with appending and searching -->
+    <appender-ref ref="INSTANCESTDERR" />
+  </root>
+</configuration>
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java 
b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
index aee6aeaeb8..5b27d3d44f 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/ICoordinator.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import org.apache.cassandra.distributed.shared.FutureUtils;
 
@@ -60,6 +61,8 @@ public interface ICoordinator
     }
 
     SimpleQueryResult executeWithResult(String query, ConsistencyLevel 
consistencyLevel, Object... boundValues);
+    Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> 
callback, String query, ConsistencyLevel consistencyLevel, Object... 
boundValues);
+    Future<?> executeWithResult(BiConsumer<SimpleQueryResult, Throwable> 
callback, String query, ConsistencyLevel serialConsistencyLevel, 
ConsistencyLevel commitConsistencyLevel, Object... boundValues);
 
     default SimpleQueryResult executeWithResult(String query, ConsistencyLevel 
serialConsistencyLevel, ConsistencyLevel commitConsistencyLevel, Object... 
boundValues)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 19d80626d3..e89ab8faa3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
 
 import com.google.common.collect.Iterators;
 
@@ -66,6 +67,30 @@ public class Coordinator implements ICoordinator
         return instance().sync(() -> unsafeExecuteInternal(query, 
consistencyLevel, boundValues)).call();
     }
 
+    @Override
+    public Future<?> executeWithResult(BiConsumer<SimpleQueryResult, 
Throwable> callback, String query, ConsistencyLevel consistencyLevel, Object... 
boundValues)
+    {
+        return executeWithResult(callback, query, null, consistencyLevel, 
boundValues);
+    }
+
+    @Override
+    public Future<?> executeWithResult(BiConsumer<SimpleQueryResult, 
Throwable> callback, String query, ConsistencyLevel serialConsistencyLevel, 
ConsistencyLevel commitConsistencyLevel, Object... boundValues)
+    {
+        return instance().async(cb -> {
+            SimpleQueryResult result;
+            try
+            {
+                result = unsafeExecuteInternal(query, serialConsistencyLevel, 
commitConsistencyLevel, boundValues);
+            }
+            catch (Throwable t)
+            {
+                callback.accept(null, t);
+                return;
+            }
+            callback.accept(result, null);
+        }).apply(callback);
+    }
+
     public Future<SimpleQueryResult> asyncExecuteWithTracingWithResult(UUID 
sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... 
boundValues)
     {
         return instance.async(() -> {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e2036a2382..464bc3b8bf 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -492,7 +492,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     @Override
     public void receiveMessage(IMessage message)
     {
-        sync(receiveMessageRunnable(message)).accept(false);
+        async(receiveMessageRunnable(message)).apply(false);
     }
 
     @Override
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
new file mode 100644
index 0000000000..42e7fbf34a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.utils.EstimatedHistogram;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class AccordLoadTest extends AccordTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordLoadTest.class);
+
+    @BeforeClass
+    public static void setUp() throws IOException
+    {
+        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.set("lwt_strategy", "accord").set("non_serial_write_strategy", 
"accord")), 2);
+    }
+
+    @Ignore
+    @Test
+    public void testLoad() throws Exception
+    {
+        test("CREATE TABLE " + qualifiedTableName + " (k int, v int, PRIMARY 
KEY(k))",
+             cluster -> {
+                 ICoordinator coordinator = cluster.coordinator(1);
+                 final int batchSize = 1000;
+                 final int concurrency = 100;
+                 final int ratePerSecond = 1000;
+                 final int keyCount = 10;
+                 for (int i = 1; i <= keyCount; i++)
+                     coordinator.execute("INSERT INTO " + qualifiedTableName + 
" (k, v) VALUES (0, 0) USING TIMESTAMP 0;", ConsistencyLevel.ALL, i);
+
+                 Random random = new Random();
+//                 CopyOnWriteArrayList<Throwable> exceptions = new 
CopyOnWriteArrayList<>();
+                 final Semaphore inFlight = new Semaphore(concurrency);
+                 final RateLimiter rateLimiter = 
RateLimiter.create(ratePerSecond);
+                 long testStart = System.nanoTime();
+//                 while (NANOSECONDS.toMinutes(System.nanoTime() - testStart) 
< 10 && exceptions.size() < 10000)
+                 while (true)
+                 {
+                     final EstimatedHistogram histogram = new 
EstimatedHistogram(200);
+                     long batchStart = System.nanoTime();
+                     for (int i = 0 ; i < batchSize ; ++i)
+                     {
+                         inFlight.acquire();
+                         rateLimiter.acquire();
+                         long commandStart = System.nanoTime();
+                         coordinator.executeWithResult((success, fail) -> {
+                             inFlight.release();
+                             if (fail == null) 
histogram.add(NANOSECONDS.toMicros(System.nanoTime() - commandStart));
+//                             else exceptions.add(fail);
+                         }, "UPDATE " + qualifiedTableName + " SET v += 1 
WHERE k = ? IF EXISTS;", ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM, 
random.nextInt(keyCount));
+                     }
+                     System.out.printf("%tT rate: %.2f/s\n", new Date(), 
(((float)batchSize * 1000) / NANOSECONDS.toMillis(System.nanoTime() - 
batchStart)));
+                     System.out.printf("%tT percentiles: %d %d %d %d\n", new 
Date(), histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);
+                 }
+             }
+        );
+    }
+
+    @Override
+    protected Logger logger()
+    {
+        return logger;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 95c495a350..ee38ffceb9 100644
--- 
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++ 
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
 import accord.api.Result;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.local.CheckedCommands;
 import accord.local.Command;
 import accord.local.CommandStore;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index af4e3f738b..fdaf519874 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -31,8 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
 import accord.api.Result;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKeys;
+import accord.local.CommandsForKey;
+import accord.impl.TimestampsForKeys;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.CommonAttributes;
@@ -41,10 +41,13 @@ import accord.messages.Apply;
 import accord.primitives.Ballot;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
+import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.ImmutableBitSet;
@@ -102,20 +105,20 @@ public class AccordCommandStoreTest
         AtomicLong clock = new AtomicLong(0);
         PartialTxn depTxn = createPartialTxn(0);
         Key key = (Key)depTxn.keys().get(0);
+        Range range = key.toUnseekable().asRange();
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
 
         QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES 
(0, 0, 1)");
         TableId tableId = Schema.instance.getTableMetadata("ks", "tbl").id;
-        TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1);
-        TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1);
-        TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1);
-        TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
+        TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, 
Routable.Domain.Range);
+        TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, 
Routable.Domain.Range);
+        TxnId txnId = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, 
Routable.Domain.Range);
 
         PartialDeps dependencies;
         try (PartialDeps.Builder builder = 
PartialDeps.builder(depTxn.covering()))
         {
-            builder.add(key, oldTxnId1);
-            builder.add(key, oldTxnId2);
+            builder.add(range, oldTxnId1);
+            builder.add(range, oldTxnId2);
             dependencies = builder.build();
         }
 
@@ -130,11 +133,9 @@ public class AccordCommandStoreTest
         Ballot accepted = ballot(1, clock.incrementAndGet(), 1);
         Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
         attrs.partialDeps(dependencies);
-        SimpleBitSet waitingOnCommit = new SimpleBitSet(2);
-        waitingOnCommit.set(0);
-        SimpleBitSet waitingOnApply = new SimpleBitSet(2);
+        SimpleBitSet waitingOnApply = new SimpleBitSet(3);
         waitingOnApply.set(1);
-        Command.WaitingOn waitingOn = new Command.WaitingOn(dependencies, new 
ImmutableBitSet(waitingOnCommit), new ImmutableBitSet(waitingOnApply), new 
ImmutableBitSet(2));
+        Command.WaitingOn waitingOn = new 
Command.WaitingOn(dependencies.keyDeps.keys(), dependencies.rangeDeps.txnIds(), 
new ImmutableBitSet(waitingOnApply), new ImmutableBitSet(2));
         attrs.addListener(new Command.ProxyListener(oldTxnId1));
         Pair<Writes, Result> result = 
AccordTestUtils.processTxnResult(commandStore, txnId, txn, executeAt);
 
@@ -183,10 +184,10 @@ public class AccordCommandStoreTest
         AccordSafeTimestampsForKey tfk = new 
AccordSafeTimestampsForKey(loaded(key, null));
         tfk.initialize();
 
-        CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId1, true);
+        TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId1, true);
         Assert.assertEquals(txnId1.hlc(), 
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId1, true));
 
-        CommandsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId2, true);
+        TimestampsForKeys.updateLastExecutionTimestamps(commandStore, tfk, 
txnId2, true);
         Assert.assertEquals(txnId2.hlc(), 
AccordSafeTimestampsForKey.timestampMicrosFor(tfk.current(), txnId2, true));
 
         Assert.assertEquals(txnId2, tfk.current().lastExecutedTimestamp());
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 5b1a1402a2..71e821d329 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -26,7 +26,7 @@ import org.junit.Test;
 
 import accord.api.Key;
 import accord.api.RoutingKey;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.local.Command;
 import accord.local.KeyHistory;
 import accord.local.Node;
@@ -117,7 +117,7 @@ public class AccordCommandTest
             Assert.assertEquals(Status.PreAccepted, command.status());
             Assert.assertTrue(command.partialDeps() == null || 
command.partialDeps().isEmpty());
 
-            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).commandsForKey(key(1)).current();
+            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).get(key(1)).current();
             Assert.assertTrue(cfk.indexOf(txnId) >= 0);
         }));
 
@@ -145,7 +145,7 @@ public class AccordCommandTest
             Assert.assertEquals(Status.Accepted, command.status());
             Assert.assertEquals(deps, command.partialDeps());
 
-            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).commandsForKey(key(1)).current();
+            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).get(key(1)).current();
             Assert.assertTrue(cfk.indexOf(txnId) >= 0);
         }));
 
@@ -160,7 +160,7 @@ public class AccordCommandTest
             Assert.assertTrue(command.hasBeen(Status.Committed));
             Assert.assertEquals(commit.partialDeps, command.partialDeps());
 
-            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).commandsForKey(key(1)).current();
+            CommandsForKey cfk = ((AccordSafeCommandStore) 
instance).get(key(1)).current();
             Assert.assertTrue(cfk.indexOf(txnId) >= 0);
         }));
     }
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 66d0c8e436..0ae2a6abc8 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -30,6 +30,8 @@ import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -59,6 +61,7 @@ import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Participants;
 import accord.primitives.Ranges;
+import accord.primitives.Routable;
 import accord.primitives.Route;
 import accord.primitives.Seekable;
 import accord.primitives.Seekables;
@@ -199,8 +202,8 @@ public class AccordTestUtils
         @Override public void executed(Command command, ProgressShard 
progressShard) {}
         @Override public void clear(TxnId txnId) {}
         @Override public void durable(Command command) {}
-        @Override
-        public void waiting(SafeCommand blockedBy, LocalExecution 
blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {}
+        @Override public void waiting(SafeCommand blockedBy, LocalExecution 
blockedUntil, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants) {}
+        @Override public void waiting(TxnId blockedBy, LocalExecution 
blockedUntil, @Nullable Route<?> blockedOnRoute, @Nullable Participants<?> 
blockedOnParticipants) {}
     };
 
     public static TxnId txnId(long epoch, long hlc, int node)
@@ -213,6 +216,11 @@ public class AccordTestUtils
         return new TxnId(epoch, hlc, kind, Key, new Node.Id(node));
     }
 
+    public static TxnId txnId(long epoch, long hlc, int node, Txn.Kind kind, 
Routable.Domain domain)
+    {
+        return new TxnId(epoch, hlc, kind, domain, new Node.Id(node));
+    }
+
     public static Timestamp timestamp(long epoch, long hlc, int node)
     {
         return Timestamp.fromValues(epoch, hlc, new Node.Id(node));
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index 8c952f02c0..33f0dc2458 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -32,13 +32,12 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import accord.api.Key;
-import accord.impl.CommandsForKey;
+import accord.local.CommandsForKey;
 import accord.impl.TimestampsForKey;
 import accord.local.Command;
 import accord.local.KeyHistory;
 import accord.primitives.Keys;
 import accord.primitives.PartialTxn;
-import accord.primitives.RoutableKey;
 import accord.primitives.TxnId;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
@@ -96,7 +95,7 @@ public class AsyncLoaderTest
         AccordStateCache.Instance<TxnId, Command, AccordSafeCommand> 
commandCache = commandStore.commandCache();
         commandStore.executeBlocking(() -> commandStore.setCapacity(1024));
 
-        AccordStateCache.Instance<RoutableKey, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsCache = 
commandStore.timestampsForKeyCache();
+        AccordStateCache.Instance<Key, TimestampsForKey, 
AccordSafeTimestampsForKey> timestampsCache = 
commandStore.timestampsForKeyCache();
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
         PartialTxn txn = createPartialTxn(0);
         PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -375,7 +374,7 @@ public class AsyncLoaderTest
     @Test
     public void inProgressCFKSaveTest()
     {
-        inProgressCFKSaveTest(COMMANDS, 
AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey, 
CommandsForKey::new, (cfk, u) -> cfk.update(null, u));
+        this.inProgressCFKSaveTest(COMMANDS, 
AccordCommandStore::commandsForKeyCache, context -> context.commandsForKey, 
CommandsForKey::new, (cfk, u) -> cfk.update(null, u));
     }
 
     @Test
@@ -384,7 +383,7 @@ public class AsyncLoaderTest
         inProgressCFKSaveTest(TIMESTAMPS, 
AccordCommandStore::timestampsForKeyCache, context -> context.timestampsForKey, 
TimestampsForKey::new, (tfk, c) -> new TimestampsForKey(tfk.key(), 
c.executeAt(), c.executeAt().hlc(), c.executeAt()));
     }
 
-    private <T1, T2 extends AccordSafeState<RoutableKey, T1>, C extends 
AccordStateCache.Instance<RoutableKey, T1, T2>>  void 
inProgressCFKSaveTest(KeyHistory history, Function<AccordCommandStore, C> 
getter, Function<Context, TreeMap<?, ?>> inContext, Function<Key, T1> 
initialiser, BiFunction<T1, Command, T1> update)
+    private <T1, T2 extends AccordSafeState<Key, T1>, C extends 
AccordStateCache.Instance<Key, T1, T2>>  void inProgressCFKSaveTest(KeyHistory 
history, Function<AccordCommandStore, C> getter, Function<Context, TreeMap<?, 
?>> inContext, Function<Key, T1> initialiser, BiFunction<T1, Command, T1> 
update)
     {
         AtomicLong clock = new AtomicLong(0);
         ManualExecutor executor = new ManualExecutor();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index b235d1ae59..a5ece26a51 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -36,7 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.RoutingKey;
-import accord.impl.SafeCommandsForKey;
+import accord.local.SafeCommandsForKey;
 import accord.local.CheckedCommands;
 import accord.local.Command;
 import accord.local.PreLoadContext;
@@ -53,7 +53,6 @@ import accord.primitives.PartialDeps;
 import accord.primitives.PartialRoute;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
@@ -259,7 +258,7 @@ public class AsyncOperationTest
         Accept accept =
             Accept.SerializerSupport.create(txnId, partialRoute, 
txnId.epoch(), txnId.epoch(), false, Ballot.ZERO, executeAt, partialTxn.keys(), 
deps);
         Commit commit =
-            Commit.SerializerSupport.create(txnId, partialRoute, 
txnId.epoch(), Commit.Kind.Commit, Ballot.ZERO, executeAt, partialTxn.keys(), 
partialTxn, deps, route, null);
+            Commit.SerializerSupport.create(txnId, partialRoute, 
txnId.epoch(), Commit.Kind.CommitSlowPath, Ballot.ZERO, executeAt, 
partialTxn.keys(), partialTxn, deps, route, null);
         Commit stable =
             Commit.SerializerSupport.create(txnId, partialRoute, 
txnId.epoch(), Commit.Kind.StableSlowPath, Ballot.ZERO, executeAt, 
partialTxn.keys(), partialTxn, deps, route, null);
 
@@ -482,8 +481,7 @@ public class AsyncOperationTest
         }
         try
         {
-            //TODO this is due to bad typing for Instance, it doesn't use ? 
extends RoutableKey
-            assertNoReferences(commandStore.commandsForKeyCache(), 
(Iterable<RoutableKey>) (Iterable<?>) keys);
+            assertNoReferences(commandStore.commandsForKeyCache(), keys);
         }
         catch (AssertionError e)
         {
@@ -524,8 +522,7 @@ public class AsyncOperationTest
     private static void awaitDone(AccordCommandStore commandStore, List<TxnId> 
ids, Keys keys)
     {
         awaitDone(commandStore.commandCache(), ids);
-        //TODO this is due to bad typing for Instance, it doesn't use ? 
extends RoutableKey
-        awaitDone(commandStore.commandsForKeyCache(), (Iterable<RoutableKey>) 
(Iterable<?>) keys);
+        awaitDone(commandStore.commandsForKeyCache(), keys);
     }
 
     private static <T> void awaitDone(AccordStateCache.Instance<T, ?, ?> 
cache, Iterable<T> keys)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index e12b3fbf87..405f92dc59 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -38,9 +38,10 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import accord.api.Key;
-import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.InternalStatus;
+import accord.local.CommandsForKey;
+import accord.local.CommandsForKey.InternalStatus;
 import accord.local.Command;
+import accord.local.CommandsForKey.TxnInfo;
 import accord.local.CommonAttributes;
 import accord.local.CommonAttributes.Mutable;
 import accord.local.Listeners;
@@ -241,9 +242,12 @@ public class CommandsForKeySerializerTest
             List<TxnId> deps = cmds[i].deps;
             List<TxnId> missing = cmds[i].missing;
             for (int j = 0 ; j < limit ; ++j)
-                if (i != j) deps.add(cmds[j].txnId);
+            {
+                if (i != j && cmds[i].txnId.kind().witnesses(cmds[j].txnId))
+                    deps.add(cmds[j].txnId);
+            }
 
-            int missingCount = Math.min(limit - (limit > i ? 1 : 0), 
missingCountSupplier.getAsInt());
+            int missingCount = Math.min(deps.size(), 
missingCountSupplier.getAsInt());
             while (missingCount > 0)
             {
                 int remove = source.nextInt(deps.size());
@@ -267,14 +271,14 @@ public class CommandsForKeySerializerTest
             {
                 InternalStatus status = 
InternalStatus.from(cmds[j].saveStatus);
                 if (status == null || !status.hasInfo) continue;
-                if (status.depsKnownBefore(cmds[j].txnId, 
cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 && 
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
+                if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) && 
status.depsKnownBefore(cmds[j].txnId, 
cmds[j].executeAt).compareTo(cmds[i].txnId) > 0 && 
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
                     continue outer;
             }
             for (int j = i + 1 ; j < cmds.length ; ++j)
             {
                 InternalStatus status = 
InternalStatus.from(cmds[j].saveStatus);
                 if (status == null || !status.hasInfo) continue;
-                if (Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 
0)
+                if (cmds[j].txnId.kind().witnesses(cmds[i].txnId) && 
Collections.binarySearch(cmds[j].missing, cmds[i].txnId) < 0)
                     continue outer;
             }
             cmds[i].invisible = true;
@@ -322,7 +326,7 @@ public class CommandsForKeySerializerTest
     @Test
     public void serde()
     {
-//        testOne(1821931462020409370L);
+        testOne(-6946067792202944553L);
         Random random = new Random();
         for (int i = 0 ; i < 10000 ; ++i)
         {
@@ -426,13 +430,13 @@ public class CommandsForKeySerializerTest
                     Assert.assertTrue(cmd.invisible);
                     continue;
                 }
-                CommandsForKey.Info info = cfk.info(i);
+                TxnInfo info = cfk.get(i);
                 InternalStatus expectStatus = 
InternalStatus.from(cmd.saveStatus);
                 if (expectStatus == null) expectStatus = 
InternalStatus.TRANSITIVELY_KNOWN;
                 if (expectStatus.hasInfo)
-                    Assert.assertEquals(cmd.executeAt, 
info.executeAt(cfk.txnId(i)));
+                    Assert.assertEquals(cmd.executeAt, info.executeAt);
                 Assert.assertEquals(expectStatus, info.status);
-                Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new), 
info.missing);
+                Assert.assertArrayEquals(cmd.missing.toArray(TxnId[]::new), 
info.missing());
                 ++i;
             }
 
@@ -461,11 +465,12 @@ public class CommandsForKeySerializerTest
                     next = txnIdGen.next(rs0);
                 return next;
             }).unique().ofSizeBetween(0, 10).next(rs);
-            CommandsForKey.Info[] info = new CommandsForKey.Info[ids.length];
+            TxnInfo[] info = new TxnInfo[ids.length];
             for (int i = 0; i < info.length; i++)
-                info[i] = rs.pick(InternalStatus.values()).asNoInfo;
-            Arrays.sort(ids, Comparator.naturalOrder());
-            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, redudentBefore, ids, info);
+                info[i] = TxnInfo.create(ids[i], 
rs.pick(InternalStatus.values()), ids[i], CommandsForKey.NO_TXNIDS);
+            Arrays.sort(info, Comparator.naturalOrder());
+
+            CommandsForKey expected = 
CommandsForKey.SerializerSupport.create(pk, info, 
CommandsForKey.NO_PENDING_UNMANAGED);
 
             ByteBuffer buffer = 
CommandsForKeySerializer.toBytesWithoutKey(expected);
             CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, 
buffer);
@@ -479,10 +484,10 @@ public class CommandsForKeySerializerTest
         long tokenValue = -2311778975040348869L;
         DecoratedKey key = 
Murmur3Partitioner.instance.decorateKey(Murmur3Partitioner.LongToken.keyForToken(tokenValue));
         PartitionKey pk = new 
PartitionKey(TableId.fromString("1b255f4d-ef25-40a6-0000-000000000009"), key);
+        TxnId txnId = TxnId.fromValues(11,34052499,2,1);
         CommandsForKey expected = CommandsForKey.SerializerSupport.create(pk,
-                                                     TxnId.fromValues(0,0,0,0),
-                                                     new TxnId[] 
{TxnId.fromValues(11,34052499,2,1)},
-                                                     new CommandsForKey.Info[] 
{ InternalStatus.PREACCEPTED.asNoInfo});
+                                                     new TxnInfo[] { 
TxnInfo.create(txnId, InternalStatus.PREACCEPTED, txnId, 
CommandsForKey.NO_TXNIDS) },
+                                                                          
CommandsForKey.NO_PENDING_UNMANAGED);
 
         ByteBuffer buffer = 
CommandsForKeySerializer.toBytesWithoutKey(expected);
         CommandsForKey roundTrip = CommandsForKeySerializer.fromBytes(pk, 
buffer);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
index 5d5fc1bf56..3df7d87e0d 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/WaitingOnSerializerTest.java
@@ -23,6 +23,8 @@ import org.junit.Test;
 
 import accord.local.Command;
 import accord.primitives.Deps;
+import accord.primitives.Routable;
+import accord.primitives.TxnId;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.SimpleBitSet;
@@ -52,19 +54,20 @@ public class WaitingOnSerializerTest
     {
         DataOutputBuffer buffer = new DataOutputBuffer();
         qt().forAll(waitingOnGen()).check(waitingOn -> {
+            TxnId txnId = TxnId.NONE;
+            if (waitingOn.appliedOrInvalidated != null) txnId = new 
TxnId(txnId.epoch(), txnId.hlc(), txnId.kind(), Routable.Domain.Range, 
txnId.node);
             buffer.clear();
             long expectedSize = WaitingOnSerializer.serializedSize(waitingOn);
-            WaitingOnSerializer.serialize(waitingOn, buffer);
+            WaitingOnSerializer.serialize(txnId, waitingOn, buffer);
             Assertions.assertThat(buffer.getLength()).isEqualTo(expectedSize);
-            Command.WaitingOn read = 
WaitingOnSerializer.deserialize(waitingOn.deps, new 
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));
+            Command.WaitingOn read = WaitingOnSerializer.deserialize(txnId, 
waitingOn.keys, waitingOn.txnIds, new 
DataInputBuffer(buffer.unsafeGetBufferAndFlip(), false));
             Assertions.assertThat(read)
                       .isEqualTo(waitingOn)
-                      
.isEqualTo(WaitingOnSerializer.deserialize(waitingOn.deps, 
WaitingOnSerializer.serialize(waitingOn)));
+                      .isEqualTo(WaitingOnSerializer.deserialize(txnId, 
waitingOn.keys, waitingOn.txnIds, WaitingOnSerializer.serialize(txnId, 
waitingOn)));
         });
     }
 
-    private enum WaitingOnSets
-    {COMMIT, APPLY, APPLYED_OR_INVALIDATED}
+    private enum WaitingOnSets { APPLY, APPLIED_OR_INVALIDATED }
 
     private static Gen<Command.WaitingOn> waitingOnGen()
     {
@@ -75,22 +78,20 @@ public class WaitingOnSerializerTest
         return rs -> {
             Deps deps = depsGen.next(rs);
             if (deps.isEmpty()) return Command.WaitingOn.EMPTY;
-            int[] selected = Gens.arrays(Gens.ints().between(0, 
deps.txnIdCount() - 1)).unique().ofSizeBetween(0, deps.txnIdCount() - 
1).next(rs);
-            SimpleBitSet waitingOnCommit = new SimpleBitSet(deps.txnIdCount(), 
false);
-            SimpleBitSet waitingOnApply = new SimpleBitSet(deps.txnIdCount(), 
false);
-            SimpleBitSet appliedOrInvalidated = new 
SimpleBitSet(deps.txnIdCount(), false);
+            int txnIdCount = deps.rangeDeps.txnIdCount();
+            int keyCount = deps.keyDeps.keys().size();
+            int[] selected = Gens.arrays(Gens.ints().between(0, txnIdCount + 
keyCount - 1)).unique().ofSizeBetween(0, txnIdCount + keyCount).next(rs);
+            SimpleBitSet waitingOn = new SimpleBitSet(txnIdCount + keyCount, 
false);
+            SimpleBitSet appliedOrInvalidated = rs.nextBoolean() ? null : new 
SimpleBitSet(txnIdCount, false);
             for (int i : selected)
             {
-                WaitingOnSets set = sets.next(rs);
+                WaitingOnSets set = appliedOrInvalidated == null || i >= 
txnIdCount ? WaitingOnSets.APPLY : sets.next(rs);
                 switch (set)
                 {
-                    case COMMIT:
-                        waitingOnCommit.set(i);
-                        break;
                     case APPLY:
-                        waitingOnApply.set(i);
+                        waitingOn.set(i);
                         break;
-                    case APPLYED_OR_INVALIDATED:
+                    case APPLIED_OR_INVALIDATED:
                         appliedOrInvalidated.set(i);
                         break;
                     default:
@@ -98,7 +99,7 @@ public class WaitingOnSerializerTest
                 }
             }
 
-            return new Command.WaitingOn(deps, 
Utils.ensureImmutable(waitingOnCommit), Utils.ensureImmutable(waitingOnApply), 
Utils.ensureImmutable(appliedOrInvalidated));
+            return new Command.WaitingOn(deps.keyDeps.keys(), 
deps.rangeDeps.txnIds(), Utils.ensureImmutable(waitingOn), 
Utils.ensureImmutable(appliedOrInvalidated));
         };
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to