This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk-tmp2
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 0f8464348d705baf7069504c89d6c4d3c652622d
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Fri Jul 25 10:51:47 2025 +0100

    Fix CFK restore after replay:
     - Remove from CFK any unapplied transactions we know cannot apply
     - Force notification of waiting commands in CFK on replay
    Also fix:
     - Don't truncateWithOutcome if pre bootstrap or stale
     - Fix RangeDeps.without(RangeDeps)
     - Fix InMemoryCommandStore replay bug with clearing DefaultLocalListeners
     - Ensure SaveStatus and executeAt are updated together to prevent 
corruption via expunge
    Improve:
     - Inform home shard that command is decided if we cannot execute, to avoid 
recovery contention
     - Don't recover sync points on the fast path
     - Don't calculate recovery info for RX (since we don't use it anymore, so 
no need to do the work)
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20797
---
 .../src/main/java/accord/api/ProgressLog.java      |  6 ++
 .../main/java/accord/api/ProtocolModifiers.java    |  2 -
 .../accord/coordinate/ExecuteEphemeralRead.java    |  5 +-
 .../main/java/accord/coordinate/ExecuteTxn.java    | 70 ++++++++++++++--
 .../java/accord/coordinate/ReadCoordinator.java    | 14 +++-
 .../src/main/java/accord/coordinate/Recover.java   |  1 +
 .../coordinate/tracking/QuorumIdTracker.java       | 11 ++-
 .../java/accord/impl/AbstractFetchCoordinator.java |  9 ++-
 .../src/main/java/accord/impl/CommandChange.java   |  9 +++
 .../java/accord/impl/DefaultLocalListeners.java    | 12 ++-
 .../accord/impl/progresslog/CoordinatePhase.java   |  5 ++
 .../impl/progresslog/DefaultProgressLog.java       | 56 ++++++++-----
 .../java/accord/impl/progresslog/HomeState.java    |  2 +-
 .../src/main/java/accord/local/Cleanup.java        | 12 ++-
 .../src/main/java/accord/local/Commands.java       |  8 +-
 .../main/java/accord/local/RedundantBefore.java    | 26 ++++--
 .../src/main/java/accord/local/SafeCommand.java    |  4 +-
 .../main/java/accord/local/SafeCommandStore.java   | 12 +--
 .../main/java/accord/local/cfk/CommandsForKey.java | 29 +++++--
 .../accord/local/cfk/CommandsForKeyUpdate.java     |  6 +-
 .../src/main/java/accord/local/cfk/NotifySink.java |  2 +-
 .../main/java/accord/local/cfk/PostProcess.java    |  4 +-
 .../src/main/java/accord/local/cfk/Pruning.java    | 59 +++++++++++++-
 .../java/accord/local/cfk/SafeCommandsForKey.java  | 30 +++----
 .../src/main/java/accord/local/cfk/Updating.java   | 22 ++---
 .../src/main/java/accord/local/cfk/Utils.java      | 37 ++++++++-
 .../accord/local/durability/ShardDurability.java   |  4 +-
 .../src/main/java/accord/messages/Accept.java      |  2 +-
 .../accord/messages/ApplyThenWaitUntilApplied.java |  9 ---
 .../main/java/accord/messages/BeginRecovery.java   |  6 +-
 .../main/java/accord/messages/InformDecided.java   | 94 ++++++++++++++++++++++
 .../main/java/accord/messages/InformDurable.java   | 35 +++++---
 .../src/main/java/accord/messages/MessageType.java |  1 +
 .../src/main/java/accord/messages/ReadData.java    |  3 +-
 .../src/main/java/accord/primitives/RangeDeps.java | 45 ++++++++---
 .../main/java/accord/utils/RelationMultiMap.java   | 13 +++
 .../src/test/java/accord/impl/basic/Cluster.java   |  8 +-
 .../src/test/java/accord/impl/list/ListAgent.java  |  6 +-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  6 +-
 39 files changed, 515 insertions(+), 170 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java 
b/accord-core/src/main/java/accord/api/ProgressLog.java
index 978315ed..3b4d8764 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -181,6 +181,11 @@ public interface ProgressLog
      */
     void update(SafeCommandStore safeStore, TxnId txnId, Command before, 
Command after, boolean force);
 
+    /**
+     * Record a remote notification that the command has been decided, so does 
not need to be recovered until ready to execute.
+     */
+    void decided(SafeCommandStore safeStore, TxnId txnId);
+
     /**
      * Process a remote asynchronous callback.
      */
@@ -237,6 +242,7 @@ public interface ProgressLog
     class NoOpProgressLog implements ProgressLog
     {
         @Override public void update(SafeCommandStore safeStore, TxnId txnId, 
Command before, Command after, boolean force) {}
+        @Override public void decided(SafeCommandStore safeStore, TxnId txnId) 
{}
         @Override public void remoteCallback(SafeCommandStore safeStore, 
SafeCommand safeCommand, SaveStatus remoteStatus, int callbackId, Node.Id from) 
{}
         @Override public void waiting(BlockedUntil blockedUntil, 
SafeCommandStore safeStore, SafeCommand blockedBy, Route<?> blockedOnRoute, 
Participants<?> blockedOnParticipants, StoreParticipants participants) {}
         @Override public void invalidIfUncommitted(TxnId txnId) {}
diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java 
b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
index 7cc815fc..92f61c4e 100644
--- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
+++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
@@ -195,8 +195,6 @@ public class ProtocolModifiers
 
     public static class Toggles
     {
-        public static final boolean artificiallyConstrainResources = 
Invariants.debug();
-
         private static FastPaths permittedFastPaths = new 
FastPaths(FastPath.values());
         public static boolean usePrivilegedCoordinator() { return 
permittedFastPaths.hasPrivilegedCoordinator(); }
         public static void setPermittedFastPaths(FastPaths 
newPermittedFastPaths) { permittedFastPaths = newPermittedFastPaths; }
diff --git 
a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
index d42c548f..dfd39c69 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java
@@ -223,14 +223,13 @@ public class ExecuteEphemeralRead extends 
ReadCoordinator<ReadReply>
         }
 
         @Override
-        protected boolean cancel()
+        public void timeout()
         {
             if (!super.cancel())
-                return false;
+                return;
 
             // TODO (desired): if we fail to commit locally we can submit a 
slow/medium path request
             callback.failure(node.id(), new Timeout(txnId, route.homeKey(), 
"Could not promptly read from local coordinator"));
-            return true;
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
index fe722a6d..dec14bd0 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
@@ -25,6 +25,8 @@ import accord.api.Result;
 import accord.api.Timeouts;
 import accord.coordinate.ExecuteFlag.CoordinationFlags;
 import accord.coordinate.ExecuteFlag.ExecuteFlags;
+import accord.coordinate.tracking.QuorumIdTracker;
+import accord.coordinate.tracking.RequestStatus;
 import accord.local.Commands;
 import accord.local.Commands.CommitOutcome;
 import accord.local.Node;
@@ -35,6 +37,7 @@ import accord.local.SequentialAsyncExecutor;
 import accord.local.StoreParticipants;
 import accord.messages.Accept;
 import accord.messages.Commit;
+import accord.messages.InformDecided;
 import accord.messages.MessageType;
 import accord.messages.ReadData;
 import accord.messages.ReadData.CommitOrReadNack;
@@ -90,10 +93,14 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
     final Topologies allTopologies;
     final CoordinationFlags flags;
     final BiConsumer<? super Result, Throwable> callback;
+    private final QuorumIdTracker stable;
 
     private final Participants<?> readScope;
+    private final boolean sendInitialStable;
     private Data data;
     private long uniqueHlc;
+    private boolean isPrivilegedVoteCommitting;
+    private boolean hasInformedDecidedOrSucceeded;
 
     ExecuteTxn(Node node, SequentialAsyncExecutor executor, Topologies 
topologies, FullRoute<?> route, Ballot ballot, ExecutePath path, 
CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt, Deps 
stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback)
     {
@@ -108,7 +115,9 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         this.sendDeps = sendDeps;
         this.flags = flags;
         this.callback = callback;
+        this.stable = new QuorumIdTracker(topologies);
         this.readScope = txn == null ? route : route.intersecting(txn.keys());
+        this.sendInitialStable = sendOnlyReadStableMessages() && path != 
RECOVER;
         Invariants.require(!txnId.awaitsOnlyDeps());
         Invariants.require(!txnId.awaitsPreviouslyOwned());
     }
@@ -120,6 +129,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         Node.Id self = node.id();
         if (permitLocalExecution() && tryIfUniversal(self))
         {
+            isPrivilegedVoteCommitting = true;
             new LocalExecute(txnId, flags.get(self)).process(node, 
node.agent().selfExpiresAt(txnId, Execute, MICROSECONDS));
         }
         else if (path == FAST && txnId.hasPrivilegedCoordinator())
@@ -142,7 +152,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         IntHashSet readSet = new IntHashSet();
         to.forEach(i -> readSet.add(i.id));
         // TODO (desired): if READY_TO_EXECUTE send a simple read (skip 
setting Stable)
-        Commit.stableAndRead(node, executor, allTopologies, commitKind(), 
txnId, txn, route, readScope, executeAt, sendDeps, readSet, flags, 
sendOnlyReadStableMessages() && path != RECOVER, this);
+        Commit.stableAndRead(node, executor, allTopologies, commitKind(), 
txnId, txn, route, readScope, executeAt, sendDeps, readSet, flags, 
sendInitialStable, this);
     }
 
     private Commit.Kind commitKind()
@@ -161,8 +171,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
     public void contact(Id to)
     {
         ExecuteFlags flags = this.flags.get(to);
-        boolean alreadySentStable = !(sendOnlyReadStableMessages() && path != 
RECOVER);
-        Request request = Commit.requestTo(to, true, allTopologies, 
commitKind(), Ballot.ZERO, txnId, txn, route, readScope, executeAt, sendDeps, 
flags, alreadySentStable, false);
+        Request request = Commit.requestTo(to, true, allTopologies, 
commitKind(), Ballot.ZERO, txnId, txn, route, readScope, executeAt, sendDeps, 
flags, sendInitialStable, false);
         // we are always sending to a replica in the latest epoch and 
requesting a read, so onlyContactOldAndReadSet is a redundant parameter
         node.send(to, request, executor, this);
     }
@@ -173,11 +182,22 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         return ((ReadOk)reply).unavailable;
     }
 
+    @Override
+    protected void onSuccessAfterDone(Id from, ReadReply reply)
+    {
+        if (!hasInformedDecidedOrSucceeded && (reply.isOk() || reply == 
Waiting))
+        {
+            if (RequestStatus.Success == stable.recordSuccess(from))
+                informDecided();
+        }
+    }
+
     @Override
     protected Action process(Id from, ReadReply reply)
     {
         if (reply.isOk())
         {
+            stable.recordSuccess(from);
             ReadOk ok = ((ReadOk) reply);
             Data next = ok.data;
             if (next != null)
@@ -196,6 +216,9 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         {
             default: throw new UnhandledEnum(nack);
             case Waiting:
+                if (from.id == node.id().id)
+                    isPrivilegedVoteCommitting = false;
+                stable.recordSuccess(from);
                 return Action.None;
 
             case Redundant:
@@ -217,6 +240,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         // TODO (expected): if we fail on the fast path and we haven't sent 
any Stable messages, we should send them now to make recovery easier
         if (failure == null)
         {
+            hasInformedDecidedOrSucceeded = true;
             Timestamp executeAt = this.executeAt;
             if (txnId.is(Txn.Kind.Write) && uniqueHlc != 0)
             {
@@ -233,10 +257,37 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         }
         else
         {
+            if (!hasInformedDecidedOrSucceeded && stable.hasReachedQuorum())
+                informDecided();
             callback.accept(null, failure);
         }
     }
 
+    @Override
+    public void onSlowResponse(Id from)
+    {
+        // send stable messages to everyone not yet contacted, and then inform 
decided, to avoid unnecessary recoveries
+        if (!hasInformedDecidedOrSucceeded && stable.hasReachedQuorum())
+            informDecided();
+        super.onSlowResponse(from);
+    }
+
+    @Override
+    public void onFailure(Id from, Throwable failure)
+    {
+        super.onFailure(from, failure);
+        if (isPrivilegedVoteCommitting && from.id == node.id().id)
+            tryFinishOnFailure();
+    }
+
+    private void informDecided()
+    {
+        Invariants.require(stable.hasReachedQuorum());
+        hasInformedDecidedOrSucceeded = true;
+        InformDecided.informHome(node, topologies, txnId, route);
+    }
+
+
     protected CoordinationAdapter<Result> adapter()
     {
         return node.coordinationAdapter(txnId, Standard);
@@ -323,14 +374,17 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply>
         }
 
         @Override
-        protected boolean cancel()
+        public void timeout()
         {
             if (!super.cancel())
-                return false;
+                return;
 
-            // TODO (desired): if we fail to commit locally we can submit a 
slow/medium path request
-            callback.failure(node.id(), new Timeout(txnId, route.homeKey(), 
"Could not promptly " + (committed ? "commit to" : "read from") + " local 
coordinator"));
-            return true;
+            if (committed) reply(null, new Timeout(txnId, route.homeKey(), 
"Could not promptly read from local coordinator"));
+            else
+            {
+                // TODO (desired): if we fail to commit locally we can submit 
a slow/medium path request
+                callback.failure(node.id(), new Timeout(txnId, 
route.homeKey(), "Could not promptly commit to local coordinator"));
+            }
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 629f5041..ade7351e 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -122,6 +122,8 @@ public abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> exten
     // TODO (desired): this isn't very clean way of integrating these responses
     protected Ranges unavailable(Reply reply) { throw new 
UnsupportedOperationException(); }
 
+    protected void onSuccessAfterDone(Id from, Reply reply) {}
+
     @Override
     public void onSuccess(Id from, Reply reply)
     {
@@ -129,7 +131,10 @@ public abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> exten
             debug.merge(from, reply, (a, b) -> a instanceof List<?> ? 
((List<Object>) a).add(b) : Lists.newArrayList(a, b));
 
         if (isDone)
+        {
+            onSuccessAfterDone(from, reply);
             return;
+        }
 
         Action action = process(from, reply);
         switch (action)
@@ -182,8 +187,7 @@ public abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> exten
         if (this.failure == null) this.failure = failure;
         else this.failure.addSuppressed(failure);
 
-        if (txnId.hasPrivilegedCoordinator() && from.id == node.id().id) 
finishOnFailure();
-        else handle(recordFailure(from));
+        handle(recordFailure(from));
     }
 
     @Override
@@ -215,6 +219,12 @@ public abstract class ReadCoordinator<Reply extends 
accord.messages.Reply> exten
         finishOnFailure();
     }
 
+    protected void tryFinishOnFailure()
+    {
+        if (!isDone)
+            finishOnFailure();
+    }
+
     protected void finishOnFailure()
     {
         Invariants.require(!isDone);
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 0ea7d252..4783f8fa 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -399,6 +399,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
                                            .intersecting(route, id -> 
!recoverOks.containsKey(id.node));
         InferredFastPath fastPath;
         if (txnId.hasPrivilegedCoordinator() && coordinatorInRecoveryQuorum) 
fastPath = Reject;
+        else if (txnId.is(Txn.Kind.ExclusiveSyncPoint)) fastPath = Reject;
         else fastPath = merge(
             supersedingRejects(recoverOkList) ? Reject : Unknown,
             tracker.inferFastPathDecision(txnId, extraCoordVotes, extraRejects)
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
index e846f8cd..e9120202 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java
@@ -18,12 +18,10 @@
 
 package accord.coordinate.tracking;
 
-import java.util.Set;
-
 import accord.local.Node;
 import accord.topology.Shard;
 import accord.topology.Topologies;
-import org.agrona.collections.ObjectHashSet;
+import accord.utils.SortedListSet;
 
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
 import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
@@ -33,12 +31,13 @@ public class QuorumIdTracker extends 
SimpleTracker<QuorumIdTracker.QuorumIdShard
 {
     public static class QuorumIdShardTracker extends ShardTracker
     {
-        protected final Set<Node.Id> successes = new ObjectHashSet<>();
-        protected Set<Node.Id> failures;
+        protected final SortedListSet<Node.Id> successes;
+        protected SortedListSet<Node.Id> failures;
 
         public QuorumIdShardTracker(Shard shard)
         {
             super(shard);
+            successes = SortedListSet.noneOf(shard.nodes);
         }
 
         public ShardOutcomes onSuccess(Node.Id from)
@@ -50,7 +49,7 @@ public class QuorumIdTracker extends 
SimpleTracker<QuorumIdTracker.QuorumIdShard
         public ShardOutcomes onFailure(Node.Id from)
         {
             if (failures == null)
-                failures = new ObjectHashSet<>();
+                failures = SortedListSet.noneOf(shard.nodes);
             return failures.add(from) && failures.size() == 1 + 
shard.maxFailures ? Fail : NoChange;
         }
 
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java 
b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index 3063e468..fa3d3310 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -60,6 +60,7 @@ import javax.annotation.Nullable;
 import static accord.messages.MessageType.StandardMessage.FETCH_DATA_REQ;
 import static accord.messages.MessageType.StandardMessage.FETCH_DATA_RSP;
 import static accord.messages.ReadData.CommitOrReadNack.Redundant;
+import static accord.messages.ReadData.CommitOrReadNack.Waiting;
 import static accord.messages.ReadEphemeralTxnData.retryInLaterEpoch;
 import static accord.primitives.SaveStatus.Applied;
 import static accord.primitives.SaveStatus.TruncatedApply;
@@ -154,15 +155,17 @@ public abstract class AbstractFetchCoordinator extends 
FetchCoordinator
                     {
                         CoordinateSyncPoint.sendApply(node, from, syncPoint);
                     }
-                    else
+                    else if (reply == Redundant)
                     {
                         fail(to, new RuntimeException(reply.toString()));
                         inflight.remove(key).cancel();
-                        if (reply != Redundant)
-                            throw new UnhandledEnum((CommitOrReadNack)reply);
                         // too late, sync point has been erased
                         // TODO (desired): stop fetch sync points from garbage 
collecting too quickly
                     }
+                    else if (reply != Waiting)
+                    {
+                        throw new UnhandledEnum((CommitOrReadNack)reply);
+                    }
                     return;
                 }
 
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 0d3cc73f..e0896b39 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -806,6 +806,15 @@ public class CommandChange
         }
 
         // make sure we have enough information to decide whether to expunge 
timestamps (for unique ApplyAt HLC guarantees)
+        if (isChanged(SAVE_STATUS, flags) || isChanged(EXECUTE_AT, flags))
+        {
+            // to ensure we don't allow a later SaveStatus to be combined with 
a stale executeAt
+            // (which can be caused by expunging, and can briefly prevent 
further expunging if it was a higher timestamp),
+            // for now we ensure we always save the two together if either 
changes
+            flags = setChanged(SAVE_STATUS, flags);
+            if (after.executeAt() != null) flags = setChanged(EXECUTE_AT, 
flags);
+            else flags = setIsNullAndChanged(EXECUTE_AT, flags);
+        }
         if (saveStatus.known.is(ApplyAtKnown) && (before == null || 
!before.saveStatus().known.is(ApplyAtKnown)))
         {
             flags = setChanged(EXECUTE_AT, flags);
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index cacc703c..ad166bed 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -61,7 +61,7 @@ public class DefaultLocalListeners implements LocalListeners
         }
 
         @Override
-        public LocalListeners create(CommandStore store)
+        public LocalListeners create(CommandStore commandStore)
         {
             return new DefaultLocalListeners(remoteListeners, notifySink);
         }
@@ -520,6 +520,14 @@ public class DefaultLocalListeners implements 
LocalListeners
     public void clear()
     {
         txnListeners = BTree.empty();
-        complexListeners.clear();
+        complexListeners.forEach((key, value) -> {
+            // the listener registration needs to be invalidated so that a 
caller does not try to cancel it
+            RegisteredComplexListeners listeners = 
complexListeners.remove(key);
+            for (int i = 0 ; i < listeners.length ; i++)
+            {
+                if (listeners.listeners[i] != null)
+                    listeners.listeners[i].index = -1;
+            }
+        });
     }
 }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java 
b/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java
index 9e74eb82..1eff3f7d 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java
@@ -32,6 +32,11 @@ public enum CoordinatePhase
      */
     Undecided,
 
+    /**
+     * durably decided but not ready to execute locally; no progress expected
+     */
+    Decided,
+
     /**
      * durably decided, but replicas may not be ready to execute; should wait 
until we can expect to successfully
      * execute the transaction before attempting recovery
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index be43381c..450ff5fc 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -36,6 +36,7 @@ import accord.local.Node;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
+import accord.primitives.Ballot;
 import accord.primitives.SaveStatus;
 import accord.local.StoreParticipants;
 import accord.primitives.Participants;
@@ -55,7 +56,7 @@ import org.agrona.collections.ObjectHashSet;
 
 import static accord.api.ProgressLog.BlockedUntil.CanApply;
 import static accord.api.ProgressLog.BlockedUntil.NotBlocked;
-import static accord.impl.progresslog.CoordinatePhase.AwaitReadyToExecute;
+import static accord.impl.progresslog.CoordinatePhase.Decided;
 import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
 import static accord.impl.progresslog.CoordinatePhase.Undecided;
 import static accord.impl.progresslog.Progress.Awaiting;
@@ -196,7 +197,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         Route<?> beforeRoute = before.route();
         Route<?> afterRoute = after.route();
         if (force || (afterRoute != null && beforeRoute == null) || 
(after.durability().isDurableOrInvalidated() && 
!before.durability().isDurableOrInvalidated()))
-            state = updateHomeState(safeStore, after, get(txnId));
+            state = updateOrInitialiseHomeState(safeStore, after, get(txnId));
 
         SaveStatus beforeSaveStatus = before.saveStatus();
         SaveStatus afterSaveStatus = after.saveStatus();
@@ -211,23 +212,40 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
 
         state.waiting().record(this, afterSaveStatus);
         if (state.isHomeInitialised())
+            updateHomeState(safeStore, state, before, after);
+    }
+
+    @Override
+    public void decided(SafeCommandStore safeStore, TxnId txnId)
+    {
+        TxnState state = get(txnId);
+        if (state != null && state.isHomeInitialised())
+            state.home().atLeast(safeStore, this, Decided, NoneExpected);
+    }
+
+    private void updateHomeState(SafeCommandStore safeStore, TxnState state, 
Command before, Command after)
+    {
+        switch (after.saveStatus())
         {
-            switch (afterSaveStatus)
-            {
-                case Stable:
-                    state.home().atLeast(safeStore, this, Undecided, 
NoneExpected);
-                    break;
-                case ReadyToExecute:
-                    state.home().atLeast(safeStore, this, AwaitReadyToExecute, 
Queued);
-                    break;
-                case PreApplied:
-                    state.home().atLeast(safeStore, this, ReadyToExecute, 
Queued);
-                    break;
-            }
+            case Stable:
+                if (!after.acceptedOrCommitted().equals(Ballot.ZERO) || 
(before != null && before.saveStatus() == SaveStatus.Committed))
+                    state.home().atLeast(safeStore, this, Decided, 
NoneExpected);
+            default:
+                // fall-through to default handler, which simply postpones any 
scheduled coordination attempt if we witness another coordination attempt in 
the meantime
+                if (state.homeProgress() == Queued && (before == null ? 
after.promised().compareTo(Ballot.ZERO) > 0 : 
(after.promised().compareTo(before.promised()) > 0) || 
after.acceptedOrCommitted().compareTo(before.acceptedOrCommitted()) > 0))
+                {
+                    clearPending(Home, state.txnId);
+                    state.home().set(safeStore, this, state.phase(), Queued);
+                }
+                break;
+            case ReadyToExecute:
+            case PreApplied:
+                state.home().atLeast(safeStore, this, ReadyToExecute, Queued);
+                break;
         }
     }
 
-    private TxnState updateHomeState(SafeCommandStore safeStore, Command 
after, @Nullable TxnState state)
+    private TxnState updateOrInitialiseHomeState(SafeCommandStore safeStore, 
Command after, @Nullable TxnState state)
     {
         Route<?> route = after.route();
         if (after.durability().isDurableOrInvalidated())
@@ -465,7 +483,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         state.waiting().setBlockedUntil(safeStore, this, blockedUntil);
         // in case progress log hasn't been updated (e.g. bug on replay), 
force an update to the command's state since we're about to wait on it
         if (!state.isHomeInitialised() && command.route() != null)
-            updateHomeState(safeStore, command, state);
+            updateOrInitialiseHomeState(safeStore, command, state);
     }
 
     @Override
@@ -676,14 +694,14 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         @Override
         public void accept(SafeCommandStore safeStore)
         {
-            if (!complete(safeStore, runKind, id, this))
-                return; // we've been cancelled
-
             // we have to read safeCommand first as it may become truncated on 
load, which may clear the progress log and invalidate us
             SafeCommand safeCommand = safeStore.ifInitialised(run.txnId);
             if (safeCommand == null)
                 return;
 
+            if (!complete(safeStore, runKind, id, this))
+                return; // we've been cancelled
+
             // check this after fetching SafeCommand, as doing so can erase 
the command (and invalidate our state)
             if (run.isDone(runKind))
                 return;
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 28190c25..381207b2 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -155,7 +155,7 @@ abstract class HomeState extends WaitingState
             tracing.trace(safeStore.commandStore(), "Invoking MaybeRecover 
with progress token %s", maxProgressToken);
 
         instance.start(invoker, MaybeRecover.maybeRecover(instance.node(), 
txnId, invalidIf(), command.route(), maxProgressToken, reportTo, invoker));
-        set(safeStore, instance, ReadyToExecute, Querying);
+        set(safeStore, instance, phase(), Querying);
     }
 
     static void recoverCallback(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog instance, TxnId txnId, @Nullable ProgressToken 
prevProgressToken, Outcome success, Throwable fail)
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index abfd175f..6d1c3be4 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -40,6 +40,7 @@ import static 
accord.local.RedundantStatus.Property.LOCALLY_DEFUNCT;
 import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE;
 import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
 import static accord.local.RedundantStatus.Property.NOT_OWNED;
+import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP_OR_STALE;
 import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
 import static accord.local.RedundantStatus.Property.TRUNCATE_BEFORE;
 import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
@@ -198,7 +199,7 @@ public enum Cleanup
         Invariants.paranoid(redundant.all(SHARD_APPLIED));
 
         if (!redundant.all(LOCALLY_DURABLE_TO_DATA_STORE))
-            return truncateWithOutcome(txnId, participants, min);
+            return truncateWithOutcome(txnId, redundant, participants, min);
 
         if (saveStatus.compareTo(Vestigial) >= 0)
         {
@@ -220,12 +221,9 @@ public enum Cleanup
             case ShardUniversal:
                 // TODO (required): consider how we guarantee not to break 
recovery of other shards if a majority on this shard are PRE_BOOTSTRAP
                 //   (if the condition is false and we fall through to 
removing Outcome)
-                if (input != FULL)
-                    return truncateWithOutcome(txnId, participants, min);
-
             case MajorityOrInvalidated:
             case Majority:
-                return truncateWithOutcome(txnId, participants, min);
+                return truncateWithOutcome(txnId, redundant, participants, 
min);
 
             case UniversalOrInvalidated:
             case Universal:
@@ -309,9 +307,9 @@ public enum Cleanup
         return INVALIDATE;
     }
 
-    private static Cleanup truncateWithOutcome(TxnId txnId, StoreParticipants 
participants, Cleanup atLeast)
+    private static Cleanup truncateWithOutcome(TxnId txnId, RedundantStatus 
status, StoreParticipants participants, Cleanup atLeast)
     {
-        return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : 
participants.executes() == null || !participants.stillExecutes().isEmpty()
+        return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : 
(participants.executes() == null || !participants.stillExecutes().isEmpty()) && 
!status.all(PRE_BOOTSTRAP_OR_STALE)
                                                                         ? 
TRUNCATE_WITH_OUTCOME : TRUNCATE;
     }
 
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 2f843fd7..8b621aa9 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -885,7 +885,7 @@ public class Commands
         Update waitingOn = new Update(command);
         if (updateWaitingOn(safeStore, command, command.executeAt(), 
waitingOn, predecessor))
         {
-            safeCommand.updateWaitingOn(waitingOn);
+            safeCommand.updateWaitingOn(safeStore, waitingOn);
             // don't bother invoking maybeExecute if we weren't already 
blocked on the updated command
             if (waitingOn.hasUpdatedDirectDependency(command.waitingOn()))
                 maybeExecute(safeStore, safeCommand, false, notifyWaitingOn);
@@ -926,7 +926,7 @@ public class Commands
         waitingOn.removeWaitingOnKey(keyIndex);
         if (uniqueHlc > 0)
             waitingOn.updateUniqueHlc(committed.executeAt(), uniqueHlc);
-        safeCommand.updateWaitingOn(waitingOn);
+        safeCommand.updateWaitingOn(safeStore, waitingOn);
         if (!waitingOn.isWaiting())
             maybeExecute(safeStore, safeCommand, false, true);
     }
@@ -1355,7 +1355,7 @@ public class Commands
         // if we are a range transaction, being redundant for this transaction 
does not imply we are redundant for all transactions
         if (redundant != null)
             update.removeWaitingOn(redundant);
-        return safeCommand.updateWaitingOn(update);
+        return safeCommand.updateWaitingOn(safeStore, update);
     }
 
     static Command removeNoLongerOwnedDependency(SafeCommandStore safeStore, 
SafeCommand safeCommand, @Nonnull TxnId wasOwned)
@@ -1366,7 +1366,7 @@ public class Commands
 
         Update update = new Update(current.waitingOn);
         update.removeWaitingOn(wasOwned);
-        return safeCommand.updateWaitingOn(update);
+        return safeCommand.updateWaitingOn(safeStore, update);
     }
 
     public static Command supplementParticipants(Command command, 
StoreParticipants participants)
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index a5bd0b57..07653fd7 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -108,28 +108,43 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
         public final long startEpoch, endEpoch;
         public final TxnId bootstrappedAt;
         public final TxnId gcBefore;
+        public final TxnId locallyAppliedBefore;
 
-        public QuickBounds(long startEpoch, long endEpoch, TxnId 
bootstrappedAt, TxnId gcBefore)
+        public QuickBounds(long startEpoch, long endEpoch, TxnId 
bootstrappedAt, TxnId gcBefore, TxnId locallyAppliedBefore)
         {
             this.startEpoch = startEpoch;
             this.endEpoch = endEpoch;
             this.bootstrappedAt = bootstrappedAt;
             this.gcBefore = gcBefore;
+            this.locallyAppliedBefore = locallyAppliedBefore;
         }
 
         public QuickBounds withEpochs(long startEpoch, long endEpoch)
         {
-            return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, 
gcBefore);
+            if (startEpoch == this.startEpoch && endEpoch == this.endEpoch)
+                return this;
+            return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, 
gcBefore, locallyAppliedBefore);
         }
 
         public QuickBounds withGcBeforeBeforeAtLeast(TxnId newGcBefore)
         {
-            return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, 
newGcBefore);
+            if (newGcBefore.equals(this.gcBefore))
+                return this;
+            return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, 
newGcBefore, locallyAppliedBefore);
         }
 
         public QuickBounds withBootstrappedAtLeast(TxnId newBootstrappedAt)
         {
-            return new QuickBounds(startEpoch, endEpoch, newBootstrappedAt, 
gcBefore);
+            if (newBootstrappedAt.equals(bootstrappedAt))
+                return this;
+            return new QuickBounds(startEpoch, endEpoch, newBootstrappedAt, 
gcBefore, locallyAppliedBefore);
+        }
+
+        public QuickBounds withLocallyAppliedAtLeast(TxnId 
newLocallyAppliedBefore)
+        {
+            if (newLocallyAppliedBefore.equals(locallyAppliedBefore))
+                return this;
+            return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, 
gcBefore, newLocallyAppliedBefore);
         }
     }
 
@@ -162,7 +177,8 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
 
         public Bounds(Range range, long startEpoch, long endEpoch, TxnId[] 
bounds, short[] statuses, @Nullable Timestamp staleUntilAtLeast)
         {
-            super(startEpoch, endEpoch, maxBound(bounds, statuses, 
PRE_BOOTSTRAP), maxBound(bounds, statuses, GC_BEFORE));
+            super(startEpoch, endEpoch, maxBound(bounds, statuses, 
PRE_BOOTSTRAP), maxBound(bounds, statuses, GC_BEFORE),
+                  maxBound(bounds, statuses, LOCALLY_APPLIED));
             this.range = range;
             this.bounds = bounds;
             this.statuses = statuses;
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java 
b/accord-core/src/main/java/accord/local/SafeCommand.java
index d0cb5dae..aea21803 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -86,9 +86,9 @@ public abstract class SafeCommand
         return update;
     }
 
-    public Command.Committed updateWaitingOn(Command.WaitingOn.Update 
waitingOn)
+    public Command.Committed updateWaitingOn(SafeCommandStore safeStore, 
Command.WaitingOn.Update waitingOn)
     {
-        return 
incidentalUpdate(Command.updateWaitingOn(current().asCommitted(), waitingOn));
+        return update(safeStore, 
Command.updateWaitingOn(current().asCommitted(), waitingOn));
     }
 
     public Command updateParticipants(SafeCommandStore safeStore, 
StoreParticipants participants)
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index da892a7a..626cb918 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -364,7 +364,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
             return;
 
         TxnId txnId = next.txnId();
-        if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, 
prev, next);
+        if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, 
prev, next, force);
         if (!CommandsForKey.managesExecution(txnId) && 
next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) && (force || 
!prev.hasBeen(Status.Stable)))
             updateUnmanagedCommandsForKey(this, next, REGISTER);
         // TODO (expected): register deps during Accept phase to more quickly 
sync epochs
@@ -374,7 +374,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
 
     abstract protected void persistFieldUpdates();
 
-    private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Command prev, Command next)
+    private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Command prev, Command next, boolean forceNotify)
     {
         StoreParticipants participants = 
next.participants().supplement(prev.participants());
         Participants<?> update = next.hasBeen(Status.Committed) ? 
participants.hasTouched() : participants.stillTouches();
@@ -386,7 +386,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         PreLoadContext execute = safeStore.canExecute(context);
         if (execute != null)
         {
-            updateManagedCommandsForKey(safeStore, execute.keys(), 
next.txnId());
+            updateManagedCommandsForKey(safeStore, execute.keys(), 
next.txnId(), forceNotify);
         }
         if (execute != context)
         {
@@ -399,12 +399,12 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
                 PreLoadContext ctx = safeStore0.context();
                 TxnId txnId = ctx.primaryTxnId();
                 Unseekables<?> keys = ctx.keys();
-                updateManagedCommandsForKey(safeStore0, keys, txnId);
+                updateManagedCommandsForKey(safeStore0, keys, txnId, 
forceNotify);
             }, safeStore.commandStore().agent);
         }
     }
 
-    private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Unseekables<?> update, TxnId txnId)
+    private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Unseekables<?> update, TxnId txnId, boolean forceNotify)
     {
         // TODO (expected): avoid reentrancy / recursion
         SafeCommand safeCommand = safeStore.get(txnId);
@@ -412,7 +412,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         {
             // we use callback and re-fetch current to guard against 
reentrancy causing
             // us to interact with "future" or stale information (respectively)
-            safeStore.get(key).callback(safeStore, safeCommand.current());
+            safeStore.get(key).callback(safeStore, safeCommand.current(), 
forceNotify);
         }
     }
 
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 869c51f6..552a22aa 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -235,7 +235,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
 
     private static boolean reportLinearizabilityViolations = true;
 
-    public static final QuickBounds NO_BOUNDS_INFO = new QuickBounds(0, 
Long.MAX_VALUE, TxnId.NONE, TxnId.NONE);
+    public static final QuickBounds NO_BOUNDS_INFO = new QuickBounds(0, 
Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE);
     public static final TxnInfo NO_INFO = TxnInfo.create(TxnId.NONE, 
TRANSITIVE, false, TxnId.NONE, Ballot.ZERO);
     public static final TxnInfo[] NO_INFOS = new TxnInfo[0];
     static final TxnId[] NOT_LOADING_PRUNED = new TxnId[0];
@@ -1202,6 +1202,17 @@ public class CommandsForKey extends CommandsForKeyUpdate
         return bounds.gcBefore;
     }
 
+    public TxnId appliedBefore()
+    {
+        return appliedBefore(bounds);
+    }
+
+    static TxnId appliedBefore(QuickBounds bounds)
+    {
+        // TODO (expected): this can be weakened to 
shardAppliedOrInvalidatedBefore
+        return bounds.locallyAppliedBefore;
+    }
+
     public boolean isPostBootstrapAndOwned(TxnId txnId)
     {
         return isPostBootstrapAndOwned(txnId, bounds);
@@ -1833,19 +1844,19 @@ public class CommandsForKey extends CommandsForKeyUpdate
         return new CommandsForKey(key, bounds, byId, minUndecidedById, 
maxAppliedPreBootstrapWriteById, committedByExecuteAt, 
maxAppliedWriteByExecuteAt, maxUniqueHlc, newLoadingPruned, prunedBeforeById, 
unmanageds);
     }
 
-    CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand, 
UpdateUnmanagedMode mode)
+    CommandsForKeyUpdate registerUnmanaged(SafeCommandStore safeStore, 
SafeCommand safeCommand, UpdateUnmanagedMode mode)
     {
         Invariants.require(mode != UPDATE);
-        return Updating.updateUnmanaged(this, safeCommand, mode, null);
+        return Updating.updateUnmanaged(this, safeStore, safeCommand, mode, 
null);
     }
 
-    void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, 
@Nullable Command updated, NotifySink notifySink)
+    void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, 
@Nullable Command updated, NotifySink notifySink, boolean forceNotify)
     {
         TxnInfo minUndecided = minUndecided();
-        if (minUndecided != null && 
!minUndecided.equals(prevCfk.minUndecided()))
+        if (minUndecided != null && (forceNotify || 
!minUndecided.equals(prevCfk.minUndecided())))
             notifySink.waitingOn(safeStore, minUndecided, key, 
SaveStatus.Stable, HasStableDeps, true);
 
-        if (updated == null)
+        if (updated == null || forceNotify)
         {
             notifyManaged(safeStore, AnyGloballyVisible, 0, 
committedByExecuteAt.length, -1, notifySink);
             return;
@@ -2031,10 +2042,11 @@ public class CommandsForKey extends CommandsForKeyUpdate
         // we can't let HLC epoch go backwards as this breaks assumptions 
around maxUniqueHlc tracking
         if (newBounds.gcBefore.hlc() < bounds.gcBefore.hlc())
         {
-            if (newBounds.endEpoch != bounds.endEpoch || 
!newBounds.bootstrappedAt.equals(bounds.bootstrappedAt))
+            if (newBounds.endEpoch != bounds.endEpoch || 
!newBounds.bootstrappedAt.equals(bounds.bootstrappedAt) || 
!newBounds.locallyAppliedBefore.equals(bounds.locallyAppliedBefore))
             {
                 newBounds = bounds.withEpochs(bounds.startEpoch, 
newBounds.endEpoch)
-                                  
.withBootstrappedAtLeast(newBounds.bootstrappedAt);
+                                  
.withBootstrappedAtLeast(newBounds.bootstrappedAt)
+                                  
.withLocallyAppliedAtLeast(bounds.locallyAppliedBefore);
             }
             else
             {
@@ -2043,6 +2055,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
         }
         else if (newBounds.gcBefore.equals(bounds.gcBefore)
                  && newBounds.bootstrappedAt.equals(bounds.bootstrappedAt)
+                 && 
newBounds.locallyAppliedBefore.equals(bounds.locallyAppliedBefore)
                  && newBounds.endEpoch == bounds.endEpoch)
         {
             return this;
diff --git 
a/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java
index 9f9fa9b8..eba5d1e8 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java
@@ -36,7 +36,7 @@ public abstract class CommandsForKeyUpdate
     @VisibleForTesting
     public abstract CommandsForKey cfk();
     abstract PostProcess postProcess();
-    abstract void postProcess(SafeCommandStore safeStore, @Nullable 
CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink);
+    abstract void postProcess(SafeCommandStore safeStore, @Nullable 
CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink, 
boolean forceNotify);
 
     static class CommandsForKeyUpdateWithPostProcess extends 
CommandsForKeyUpdate
     {
@@ -62,9 +62,9 @@ public abstract class CommandsForKeyUpdate
         }
 
         @Override
-        void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey 
prevCfk, @Nullable Command command, NotifySink notifySink)
+        void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey 
prevCfk, @Nullable Command command, NotifySink notifySink, boolean forceNotify)
         {
-            cfk.postProcess(safeStore, prevCfk, command, notifySink);
+            cfk.postProcess(safeStore, prevCfk, command, notifySink, 
forceNotify);
             postProcess.postProcess(safeStore, cfk.key(), notifySink);
         }
     }
diff --git a/accord-core/src/main/java/accord/local/cfk/NotifySink.java 
b/accord-core/src/main/java/accord/local/cfk/NotifySink.java
index cfd0f9e0..38b417f6 100644
--- a/accord-core/src/main/java/accord/local/cfk/NotifySink.java
+++ b/accord-core/src/main/java/accord/local/cfk/NotifySink.java
@@ -125,7 +125,7 @@ interface NotifySink
             SafeCommandsForKey update = safeStore.ifLoadedAndInitialised(key);
             if (update != null && safeStore.tryRecurse())
             {
-                try { update.callback(safeStore, 
safeStore.unsafeGet(txnId).current()); }
+                try { update.callback(safeStore, 
safeStore.unsafeGet(txnId).current(), false); }
                 finally { safeStore.unrecurse(); }
             }
             else
diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java 
b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
index 5c08f8b4..b37ffb56 100644
--- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java
+++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java
@@ -115,7 +115,7 @@ abstract class PostProcess
             StoreParticipants participants = command.participants();
             if (participants.owns().domain() == Routable.Domain.Key && 
!participants.hasTouched(safeCfk.key()))
                 command = safeCommand.updateParticipants(safeStore, 
participants.supplementHasTouched(RoutingKeys.of(safeCfk.key())));
-            safeCfk.callback(safeStore, command, notifySink);
+            safeCfk.callback(safeStore, command, notifySink, false);
         }
 
         static CommandsForKeyUpdate load(TxnId[] txnIds, CommandsForKeyUpdate 
result)
@@ -214,7 +214,7 @@ abstract class PostProcess
                 {
                     try
                     {
-                        CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeCommand, UPDATE, addUnmanageds);
+                        CommandsForKeyUpdate update = updateUnmanaged(cfk, 
safeStore, safeCommand, UPDATE, addUnmanageds);
                         if (update != cfk)
                         {
                             Invariants.require(update.cfk() == cfk);
diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java 
b/accord-core/src/main/java/accord/local/cfk/Pruning.java
index 5d6ccf8e..68b4ad1d 100644
--- a/accord-core/src/main/java/accord/local/cfk/Pruning.java
+++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java
@@ -39,6 +39,8 @@ import static 
accord.api.ProtocolModifiers.Toggles.isTransitiveDependencyVisible
 import static accord.local.CommandSummaries.SummaryStatus.APPLIED;
 import static accord.local.cfk.CommandsForKey.InternalStatus.COMMITTED;
 import static accord.local.cfk.CommandsForKey.InternalStatus.PRUNED;
+import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE;
+import static accord.local.cfk.CommandsForKey.appliedBefore;
 import static accord.local.cfk.CommandsForKey.bootstrappedAt;
 import static accord.local.cfk.CommandsForKey.insertPos;
 import static accord.local.cfk.CommandsForKey.managesExecution;
@@ -530,6 +532,7 @@ public class Pruning
 
     static TxnInfo[] removeRedundantById(TxnInfo[] byId, boolean 
hasRedundantLoadingPruned, QuickBounds prevBounds, QuickBounds newBounds)
     {
+        TxnId newAppliedBefore = appliedBefore(newBounds);
         TxnId newRedundantBefore = redundantBefore(newBounds);
         TxnId newBootstrappedAt = bootstrappedAt(newBounds);
         TxnId prevRedundantBefore = redundantBefore(prevBounds);
@@ -539,22 +542,70 @@ public class Pruning
 
         TxnInfo[] newById = byId;
         int pos = insertPos(byId, newRedundantBefore);
-        if (pos != 0 || hasRedundantLoadingPruned)
+        int appliedPos = Arrays.binarySearch(byId, pos, byId.length, 
newAppliedBefore);
+        if (appliedPos < 0) appliedPos = -1 - appliedPos;
+        if (pos != 0 || appliedPos != 0 || hasRedundantLoadingPruned)
         {
             if (Invariants.isParanoid() && testParanoia(LINEAR, NONE, LOW))
             {
                 int startPos = prevBootstrappedAt == null ? 0 : 
insertPos(byId, prevBootstrappedAt);
                 for (int i = startPos ; i < pos ; ++i)
-                    Invariants.require(byId[i].isNot(COMMITTED) || 
!byId[i].mayExecute() || !reportLinearizabilityViolations(), "%s redundant; 
expected to be applied, undecided or to execute in a future epoch", byId[i]);
+                    Invariants.require((byId[i].isNot(COMMITTED) && 
byId[i].isNot(STABLE)) || !byId[i].mayExecute() || 
!reportLinearizabilityViolations(), "%s redundant; expected to be applied, 
undecided or to execute in a future epoch", byId[i]);
+            }
+
+            int removeUnappliedCount = 0;
+            if (appliedPos > pos)
+            {
+                // we apply additional filtering to remove any transactions we 
know would apply locally, but haven't executed
+                // so we know they will not execute. note: we cannot do this 
safely for any transactions we don't execute locally!
+                // this is used to handle consistent restore on replay, where 
we may have some transaction that is logically
+                // invalidated, but we may only record the invalidation after 
the snapshot is created because the RX doesn't
+                // record it as a dependency (and so it doesn't have to be 
decided for the RX to execute).
+                // We may then later invalidate and update the CFK, but since 
this is not reflected in the snapshot,
+                // after restoring from snapshot we may have an inconsistency 
between the command state and the CFK state,
+                // and won't know that we should update the CFK because the 
command is already invalidated.
+                // So, to avoid having to read all CFK on startup and process 
any potentially invalidated transactions,
+                // we instead apply this filtering aggressively whenever we 
know the transaction cannot apply,
+                // and rely on the applied RX to correctly reject recovery of 
the transaction.
+                for (int i = pos ; i < appliedPos ; ++i)
+                {
+                    if (byId[i].compareTo(APPLIED) < 0)
+                    {
+                        if (byId[i].mayExecute())
+                        {
+                            Invariants.require((byId[i].isNot(COMMITTED) && 
byId[i].isNot(STABLE)) || !reportLinearizabilityViolations(), "%s redundant; 
expected to be applied, undecided or to execute in a future epoch", byId[i]);
+                            // we only filter those that would apply locally
+                            removeUnappliedCount++;
+                        }
+                    }
+                }
+            }
+
+            int newAppliedBeforeIndex;
+            if (removeUnappliedCount > 0)
+            {
+                newAppliedBeforeIndex = (appliedPos - pos) - 
removeUnappliedCount;
+                newById = new TxnInfo[(byId.length - appliedPos) + 
newAppliedBeforeIndex];
+                removeUnappliedCount = 0;
+                for (int i = pos ; i < appliedPos ; ++i)
+                {
+                    if (byId[i].compareTo(APPLIED) < 0 && 
byId[i].mayExecute()) ++removeUnappliedCount;
+                    else newById[i - (pos + removeUnappliedCount)] = byId[i];
+                }
+                System.arraycopy(byId, appliedPos, newById, 
newAppliedBeforeIndex, newById.length - newAppliedBeforeIndex);
+            }
+            else
+            {
+                newAppliedBeforeIndex = -1;
+                newById = Arrays.copyOfRange(byId, pos, byId.length);
             }
 
-            newById = Arrays.copyOfRange(byId, pos, byId.length);
             for (int i = 0 ; i < newById.length ; ++i)
             {
                 TxnInfo txn = newById[i];
                 TxnId[] missing = txn.missing();
                 if (missing == NO_TXNIDS) continue;
-                missing = removeRedundantMissing(missing, newRedundantBefore);
+                missing = removeRedundantMissing(missing, newRedundantBefore, 
newById, newAppliedBeforeIndex);
                 newById[i] = txn.withMissing(missing);
             }
         }
diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
index 90367c2d..9c7158a5 100644
--- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java
@@ -47,38 +47,32 @@ public abstract class SafeCommandsForKey implements 
SafeState<CommandsForKey>
         return key;
     }
 
-    public void update(SafeCommandStore safeStore, Command nextCommand)
-    {
-        CommandsForKey prevCfk = current();
-        update(safeStore, nextCommand, prevCfk, prevCfk.update(safeStore, 
nextCommand));
-    }
-
     public void updateUniqueHlc(SafeCommandStore safeStore, long uniqueHlc)
     {
         CommandsForKey prevCfk = current();
-        update(safeStore, null, prevCfk, prevCfk.updateUniqueHlc(uniqueHlc));
+        update(safeStore, null, prevCfk, prevCfk.updateUniqueHlc(uniqueHlc), 
false);
     }
 
     // equivalent to update, but for async callbacks with additional 
validation around pruning
-    public void callback(SafeCommandStore safeStore, Command nextCommand)
+    public void callback(SafeCommandStore safeStore, Command nextCommand, 
boolean forceNotify)
     {
-        callback(safeStore, nextCommand, DefaultNotifySink.INSTANCE);
+        callback(safeStore, nextCommand, DefaultNotifySink.INSTANCE, 
forceNotify);
     }
 
-    public void callback(SafeCommandStore safeStore, Command nextCommand, 
NotifySink notifySink)
+    public void callback(SafeCommandStore safeStore, Command nextCommand, 
NotifySink notifySink, boolean forceNotify)
     {
         CommandsForKey prevCfk = current();
-        update(safeStore, nextCommand, prevCfk, prevCfk.callback(safeStore, 
nextCommand), notifySink);
+        update(safeStore, nextCommand, prevCfk, prevCfk.callback(safeStore, 
nextCommand), notifySink, forceNotify);
     }
 
-    private void update(SafeCommandStore safeStore, @Nullable Command command, 
CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk)
+    private void update(SafeCommandStore safeStore, @Nullable Command command, 
CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, boolean forceNotify)
     {
-        update(safeStore, command, prevCfk, updateCfk, 
DefaultNotifySink.INSTANCE);
+        update(safeStore, command, prevCfk, updateCfk, 
DefaultNotifySink.INSTANCE, forceNotify);
     }
 
-    private void update(SafeCommandStore safeStore, @Nullable Command command, 
CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, NotifySink notifySink)
+    private void update(SafeCommandStore safeStore, @Nullable Command command, 
CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, NotifySink notifySink, 
boolean forceNotify)
     {
-        if (updateCfk == prevCfk)
+        if (updateCfk == prevCfk && !forceNotify)
             return;
 
         CommandsForKey nextCfk = updateCfk.cfk();
@@ -92,19 +86,19 @@ public abstract class SafeCommandsForKey implements 
SafeState<CommandsForKey>
             set(nextCfk);
         }
 
-        updateCfk.postProcess(safeStore, prevCfk, command, notifySink);
+        updateCfk.postProcess(safeStore, prevCfk, command, notifySink, 
forceNotify);
     }
 
     public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand 
unmanaged, UpdateUnmanagedMode mode)
     {
         CommandsForKey prevCfk = current();
-        update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged, 
mode));
+        update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(safeStore, 
unmanaged, mode), false);
     }
 
     public void updateRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore.Bounds redundantBefore)
     {
         CommandsForKey prevCfk = current();
-        update(safeStore, null, prevCfk, 
prevCfk.withRedundantBeforeAtLeast(redundantBefore));
+        update(safeStore, null, prevCfk, 
prevCfk.withRedundantBeforeAtLeast(redundantBefore), false);
     }
 
     public void initialize()
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java 
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index 4bddc48e..c38c86a2 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -35,6 +35,7 @@ import accord.local.CommandStore;
 import accord.local.PreLoadContext;
 import accord.local.RedundantBefore.QuickBounds;
 import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey.InternalStatus;
 import 
accord.local.cfk.CommandsForKeyUpdate.CommandsForKeyUpdateWithPostProcess;
 import accord.local.cfk.PostProcess.LoadPruned;
@@ -850,7 +851,7 @@ class Updating
         commandStore.execute(context, safeStore -> {
             SafeCommandsForKey safeCommandsForKey = safeStore.get(key);
             CommandsForKey cur = safeCommandsForKey.current();
-            CommandsForKeyUpdate next = Updating.updateUnmanaged(cur, 
safeStore.unsafeGet(txnId));
+            CommandsForKeyUpdate next = Updating.updateUnmanaged(cur, 
safeStore, safeStore.unsafeGet(txnId));
             if (cur != next)
             {
                 if (cur != next.cfk())
@@ -863,14 +864,14 @@ class Updating
         }, commandStore.agent());
     }
 
-    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommand safeCommand)
+    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommandStore safeStore, SafeCommand safeCommand)
     {
-        return Updating.updateUnmanaged(cfk, safeCommand, UPDATE, null);
+        return Updating.updateUnmanaged(cfk, safeStore, safeCommand, UPDATE, 
null);
     }
 
-    static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, 
SafeCommand safeCommand)
+    static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, 
SafeCommandStore safeStore, SafeCommand safeCommand)
     {
-        return Updating.updateUnmanaged(cfk, safeCommand, REGISTER_DEPS_ONLY, 
null);
+        return Updating.updateUnmanaged(cfk, safeStore, safeCommand, 
REGISTER_DEPS_ONLY, null);
     }
 
     /**
@@ -880,7 +881,7 @@ class Updating
      *  - {@code UPDATE, update == null}: fails if any dependencies are 
missing; always returns a CommandsForKey
      *  - {@code UPDATE && update != null}: fails if any dependencies are 
missing; always returns the original CommandsForKey, and maybe adds a new 
Unmanaged to {@code update}
      */
-    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable 
List<CommandsForKey.Unmanaged> update)
+    static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, 
SafeCommandStore safeStore, SafeCommand safeCommand, UpdateUnmanagedMode mode, 
@Nullable List<CommandsForKey.Unmanaged> update)
     {
         boolean register = mode != UPDATE;
         Invariants.requireArgument(mode == UPDATE || update == null);
@@ -1017,7 +1018,8 @@ class Updating
                 }
                 else
                 {
-                    
Invariants.require(txnIds.get(i++).compareTo(cfk.prunedBefore()) < 0);
+                    
Invariants.require(txnIds.get(i).compareTo(TxnId.max(cfk.bounds.locallyAppliedBefore,
 cfk.prunedBefore())) < 0);
+                    ++i;
                 }
             }
 
@@ -1052,7 +1054,7 @@ class Updating
                 }
             }
 
-            waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, 
effectiveExecutesAt, safeCommand);
+            waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, 
effectiveExecutesAt, safeStore, safeCommand);
             if (!readyToApply || missingCount > 0 || newById != null)
             {
                 if (newById == null) newById = byId;
@@ -1137,7 +1139,7 @@ class Updating
         return new CommandsForKeyUpdateWithPostProcess(cfk, new 
NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() }));
     }
 
-    private static Timestamp updateExecuteAtLeast(Timestamp 
waitingToExecuteAt, Timestamp effectiveExecutesAt, SafeCommand safeCommand)
+    private static Timestamp updateExecuteAtLeast(Timestamp 
waitingToExecuteAt, Timestamp effectiveExecutesAt, SafeCommandStore safeStore, 
SafeCommand safeCommand)
     {
         if (waitingToExecuteAt instanceof TxnInfo)
             waitingToExecuteAt = ((TxnInfo) 
waitingToExecuteAt).plainExecuteAt();
@@ -1152,7 +1154,7 @@ class Updating
                 if (effectiveExecutesAt instanceof TxnInfo)
                     effectiveExecutesAt = ((TxnInfo) 
effectiveExecutesAt).plainExecuteAt();
                 waitingOn.updateExecuteAtLeast(txnId, effectiveExecutesAt);
-                safeCommand.updateWaitingOn(waitingOn);
+                safeCommand.updateWaitingOn(safeStore, waitingOn);
             }
         }
 
diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java 
b/accord-core/src/main/java/accord/local/cfk/Utils.java
index 695e8f88..c55ae13c 100644
--- a/accord-core/src/main/java/accord/local/cfk/Utils.java
+++ b/accord-core/src/main/java/accord/local/cfk/Utils.java
@@ -350,16 +350,45 @@ class Utils
         return newMissing;
     }
 
-    static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore)
+    static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, 
TxnInfo[] newById, int appliedBeforeIndex)
     {
         if (missing == NO_TXNIDS)
             return NO_TXNIDS;
 
         int j = Arrays.binarySearch(missing, removeBefore);
         if (j < 0) j = -1 - j;
-        if (j <= 0) return missing;
-        if (j == missing.length) return NO_TXNIDS;
-        return Arrays.copyOfRange(missing, j, missing.length);
+        if (j > 0)
+        {
+            if (j == missing.length) return NO_TXNIDS;
+            missing = Arrays.copyOfRange(missing, j, missing.length);
+        }
+        if (appliedBeforeIndex < 0)
+            return missing;
+
+        int removed = 0;
+        j = SortedArrays.binarySearch(newById, 0, appliedBeforeIndex, 
missing[0], TxnId::compareTo, FAST);
+        if (j >= 0) ++j;
+        else
+        {
+            ++removed;
+            j = -1 - j;
+        }
+        for (int i = 1 ; i < missing.length ; ++i)
+        {
+            j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, 
missing[i], TxnId::compareTo, FAST);
+            if (j < 0)
+            {
+                ++removed;
+                j = -1 - j;
+            }
+            else if (removed > 0)
+            {
+                missing[i - removed] = missing[i];
+            }
+        }
+        if (removed == 0) return missing;
+        else if (removed == missing.length) return NO_TXNIDS;
+        else return Arrays.copyOf(missing, missing.length - removed);
     }
 
     static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing)
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index 01f90a53..d49fb129 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -423,8 +423,8 @@ public class ShardDurability
     /*
      * In each cycle, attempt to split the range into this many pieces; if we 
fail, we increase the number of pieces
      */
-    private int targetShardSplits = 64;
-    private int maxShardSplits = 1 << 10;
+    private int targetShardSplits = 8;
+    private int maxShardSplits = 64;
 
     /*
      * Target for how often the entire ring should be processed in 
microseconds. Every node will start at an offset in the current round that is 
based
diff --git a/accord-core/src/main/java/accord/messages/Accept.java 
b/accord-core/src/main/java/accord/messages/Accept.java
index fa8753aa..c0e8013f 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -182,7 +182,7 @@ public class Accept extends 
TxnRequest.WithUnsynced<Accept.AcceptReply>
 
                 Invariants.require(deps.maxTxnId(txnId).epoch() <= 
executeAt.epoch());
                 if (filterDuplicateDependenciesFromAcceptReply())
-                    deps = deps.without(this.partialDeps);
+                    deps = deps.without(partialDeps);
 
                 Participants<?> successful = isPartialAccept ? 
participants.touches() : null;
                 return new AcceptReply(successful, deps, flags);
diff --git 
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 8a582291..64465149 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -141,15 +141,6 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
     public void accept(CommitOrReadNack reply, Throwable failure)
     {
         super.accept(reply, failure);
-
-        boolean waiting;
-        synchronized (this)
-        {
-            waiting = waitingOnCount >= 0;
-        }
-        if (waiting && reply == null && failure == null)
-            node.reply(replyTo, replyContext, CommitOrReadNack.Waiting, null);
-
         txn = null;
         deps = null;
         writes = null;
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java 
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 15e3d883..233cd542 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -146,9 +146,9 @@ public class BeginRecovery extends 
TxnRequest.WithUnsynced<BeginRecovery.Recover
         boolean supersedingRejects;
         Deps earlierNoWait, earlierWait;
         Deps laterCoordRejects;
-        if (command.hasBeen(AcceptedMedium))
+        if (command.hasBeen(AcceptedMedium) || txnId.is(ExclusiveSyncPoint))
         {
-            supersedingRejects = false;
+            supersedingRejects = !command.hasBeen(AcceptedMedium);
             earlierNoWait = earlierWait = Deps.NONE;
             laterCoordRejects = Deps.NONE;
         }
@@ -230,6 +230,8 @@ public class BeginRecovery extends 
TxnRequest.WithUnsynced<BeginRecovery.Recover
     @Override
     public LoadKeysFor loadKeysFor()
     {
+        if (txnId.is(ExclusiveSyncPoint))
+            return LoadKeysFor.READ_WRITE;
         return LoadKeysFor.RECOVERY;
     }
 
diff --git a/accord-core/src/main/java/accord/messages/InformDecided.java 
b/accord-core/src/main/java/accord/messages/InformDecided.java
new file mode 100644
index 00000000..6bc6d136
--- /dev/null
+++ b/accord-core/src/main/java/accord/messages/InformDecided.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.messages;
+
+import accord.api.RoutingKey;
+import accord.local.Node;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import accord.utils.async.Cancellable;
+
+import static accord.messages.MessageType.StandardMessage.INFORM_DECIDED_REQ;
+
+public class InformDecided extends AbstractRequest<Reply>
+{
+    public static class SerializationSupport
+    {
+        public static InformDecided create(TxnId txnId, RoutingKey homeKey)
+        {
+            return new InformDecided(txnId, homeKey);
+        }
+    }
+
+    public final RoutingKey homeKey;
+    public InformDecided(TxnId txnId, RoutingKey homeKey)
+    {
+        super(txnId);
+        this.homeKey = homeKey;
+    }
+
+    public static void informHome(Node node, Topologies any, TxnId txnId, 
Route<?> route)
+    {
+        Shard homeShard = InformDurable.homeShard(node, any, txnId, 
route.homeKey());
+        node.send(homeShard.nodes, to -> new InformDecided(txnId, 
route.homeKey()));
+    }
+
+    @Override
+    public Cancellable submit()
+    {
+        // TODO (expected): do not load from disk to perform this update, just 
write a delta to any journal
+        return node.mapReduceConsumeLocal(this, homeKey, txnId.epoch(), this);
+    }
+
+    @Override
+    public Reply apply(SafeCommandStore safeStore)
+    {
+        safeStore.progressLog().decided(safeStore, txnId);
+        return null;
+    }
+
+    @Override
+    public TxnId primaryTxnId()
+    {
+        // we don't need the transaction loaded to update the progress log
+        return null;
+    }
+
+    @Override
+    protected void acceptInternal(Reply reply, Throwable failure)
+    {
+    }
+
+    @Override
+    public String toString()
+    {
+        return "InformDecided{" +
+               "txnId:" + txnId +
+               '}';
+    }
+
+    @Override
+    public MessageType type()
+    {
+        return INFORM_DECIDED_REQ;
+    }
+}
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java 
b/accord-core/src/main/java/accord/messages/InformDurable.java
index 773c2dcc..1a4df25f 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -19,6 +19,7 @@ package accord.messages;
 
 import javax.annotation.Nullable;
 
+import accord.api.RoutingKey;
 import accord.local.Commands;
 import accord.local.LoadKeys;
 import accord.local.Node;
@@ -89,17 +90,8 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
 
     public static void informHome(Node node, Topologies any, TxnId txnId, 
Route<?> route, Timestamp executeAt, Durability durability)
     {
-        long homeEpoch = txnId.epoch();
-        Topology homeEpochTopology = any.getEpoch(homeEpoch);
-        int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
-        if (homeShardIndex < 0)
-        {
-            homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
-            homeShardIndex = homeEpochTopology.indexForKey(route.homeKey());
-        }
-
-        Shard homeShard = homeEpochTopology.get(homeShardIndex);
-        Topologies homeTopology = new Topologies.Single(any, new 
Topology(homeEpoch, homeShard));
+        Shard homeShard = homeShard(node, any, txnId, route.homeKey());
+        Topologies homeTopology = new Topologies.Single(any, new 
Topology(txnId.epoch(), homeShard));
         node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, 
route.homeKeyOnlyRoute(), txnId, executeAt, txnId.epoch(), txnId.epoch(), 
durability));
     }
 
@@ -108,6 +100,25 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
         node.send(inform.nodes(), to -> new InformDurable(to, inform, route, 
txnId, executeAt, inform.oldestEpoch(), inform.currentEpoch(), durability));
     }
 
+    static Shard homeShard(Node node, Topologies any, TxnId txnId, RoutingKey 
homeKey)
+    {
+        long homeEpoch = txnId.epoch();
+        int homeShardIndex = -1;
+        Topology homeEpochTopology = null;
+        if (any.containsEpoch(homeEpoch))
+        {
+            homeEpochTopology = any.getEpoch(homeEpoch);
+            homeShardIndex = homeEpochTopology.indexForKey(homeKey);
+        }
+        if (homeShardIndex < 0)
+        {
+            homeEpochTopology = node.topology().globalForEpoch(homeEpoch);
+            homeShardIndex = homeEpochTopology.indexForKey(homeKey);
+        }
+
+        return homeEpochTopology.get(homeShardIndex);
+    }
+
     @Override
     public Cancellable submit()
     {
@@ -142,7 +153,7 @@ public class InformDurable extends TxnRequest<Reply> 
implements PreLoadContext
     @Override
     public String toString()
     {
-        return "InformOfPersistence{" +
+        return "InformDurable{" +
                "txnId:" + txnId +
                '}';
     }
diff --git a/accord-core/src/main/java/accord/messages/MessageType.java 
b/accord-core/src/main/java/accord/messages/MessageType.java
index a75cb966..8c1d3566 100644
--- a/accord-core/src/main/java/accord/messages/MessageType.java
+++ b/accord-core/src/main/java/accord/messages/MessageType.java
@@ -42,6 +42,7 @@ public interface MessageType
         CHECK_STATUS_REQ, CHECK_STATUS_RSP,
         FETCH_DATA_REQ, FETCH_DATA_RSP,
         INFORM_DURABLE_REQ,
+        INFORM_DECIDED_REQ,
         GET_LATEST_DEPS_REQ, GET_LATEST_DEPS_RSP,
         GET_MAX_CONFLICT_REQ, GET_MAX_CONFLICT_RSP,
         GET_DURABLE_BEFORE_REQ, GET_DURABLE_BEFORE_RSP,
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index e021d527..c89ce71b 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -68,6 +68,7 @@ import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE;
 import static accord.messages.MessageType.StandardMessage.READ_RSP;
 import static accord.messages.ReadData.CommitOrReadNack.Insufficient;
 import static accord.messages.ReadData.CommitOrReadNack.Redundant;
+import static accord.messages.ReadData.CommitOrReadNack.Waiting;
 import static accord.messages.TxnRequest.latestRelevantEpochIndex;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Txn.Kind.EphemeralRead;
@@ -313,7 +314,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
                     if (c < 0) safeStore.progressLog().waiting(HasStableDeps, 
safeStore, safeCommand, null, null, participants);
                     else if (c > 0 && status.compareTo(executeOn().min) >= 0 
&& status.compareTo(SaveStatus.PreApplied) < 0) 
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, scope, 
null);
                     node.agent().localEvents().onReadWaiting(safeStore, 
command);
-                    return status.compareTo(SaveStatus.Stable) >= 0 ? null : 
Insufficient;
+                    return status.compareTo(SaveStatus.Stable) >= 0 ? Waiting 
: Insufficient;
 
                 case OBSOLETE:
                     state = State.PENDING_OBSOLETE;
diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java 
b/accord-core/src/main/java/accord/primitives/RangeDeps.java
index e60ee1ad..34eecda6 100644
--- a/accord-core/src/main/java/accord/primitives/RangeDeps.java
+++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java
@@ -720,18 +720,22 @@ public class RangeDeps implements 
Iterable<Map.Entry<Range, TxnId>>, KeyOrRangeD
             if (!RelationMultiMap.removeWithPartialMatches(txnIds, ranges, 
txnIdsToRanges,
                                                            remove.txnIds, 
remove.ranges, remove.txnIdsToRanges,
                                                            TxnId::compareTo, 
Range::compareIntersecting, builder, (b, id, kr, rr) -> {
-                Range remainder = null;
-                if (rr != null)
+                if (rr == null)
                 {
-                    int compareStarts = rr.start().compareTo(kr.start());
-                    if (rr.end().compareTo(kr.end()) < 0)
-                        remainder = rr.newRange(compareStarts >= 0 ? 
rr.start() : kr.start(), rr.end());
-                    if (compareStarts <= 0)
-                        return remainder;
-                    kr = kr.newRange(kr.start(), rr.start());
+                    b.add(id, kr);
+                    return null;
+                }
+                int compareStarts = rr.start().compareTo(kr.start());
+                int compareEnds = rr.end().compareTo(kr.end());
+                if (compareStarts <= 0 && compareEnds >= 0) return null;
+                else if (compareStarts <= 0) return rr.newRange(rr.end(), 
kr.end());
+                else
+                {
+                    b.add(rr.newRange(kr.start(), rr.start()), id);
+                    if (compareEnds >= 0)
+                        return null;
+                    return rr.newRange(rr.end(), kr.end());
                 }
-                b.add(id, kr);
-                return remainder;
             }))
             {
                 return this;
@@ -1160,10 +1164,25 @@ public class RangeDeps implements 
Iterable<Map.Entry<Range, TxnId>>, KeyOrRangeD
                 if (range.compareIntersecting(last) == 0)
                 {
                     RoutingKey rstart = range.start(), lstart = last.start();
-                    Invariants.require(rstart.compareTo(lstart) >= 0);
+                    RoutingKey start = rstart.compareTo(lstart) < 0 ? rstart : 
lstart;
                     RoutingKey rend = range.end(), lend = last.end();
-                    if (rend.compareTo(lend) > 0)
-                        updateLast(last.newRange(lstart, rend));
+                    RoutingKey end = rend.compareTo(lend) > 0 ? rend : lend;
+                    if (start != lstart || end != lend)
+                    {
+                        Range newRange = last.newRange(start, end);
+                        if (start != lstart)
+                        {
+                            Range prev = penultimateKeyValue();
+                            while (prev != null && 
prev.compareIntersecting(newRange) == 0)
+                            {
+                                removeLastKeyValue();
+                                if (prev.start().compareTo(start) < 0)
+                                    newRange = newRange.newRange(prev.start(), 
end);
+                                prev = penultimateKeyValue();
+                            }
+                        }
+                        updateLast(newRange);
+                    }
                     return;
                 }
             }
diff --git a/accord-core/src/main/java/accord/utils/RelationMultiMap.java 
b/accord-core/src/main/java/accord/utils/RelationMultiMap.java
index 5e09ee1a..1c549908 100644
--- a/accord-core/src/main/java/accord/utils/RelationMultiMap.java
+++ b/accord-core/src/main/java/accord/utils/RelationMultiMap.java
@@ -212,6 +212,19 @@ public class RelationMultiMap
             keysToValues[totalCount - 1] = value;
         }
 
+        protected V penultimateKeyValue()
+        {
+            if (totalCount - keyOffset < 2)
+                return null;
+            return keysToValues[totalCount - 2];
+        }
+
+        protected void removeLastKeyValue()
+        {
+            Invariants.require(totalCount - keyOffset > 0);
+            keysToValues[--totalCount] = null;
+        }
+
         /**
          * Add this command as a dependency for each intersecting key
          */
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index a0acf419..3bc9ae49 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -700,11 +700,7 @@ public class Cluster
                                      TestProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
cacheLoading), new CoordinationAdapter.DefaultFactory(),
                                      journal.durableBeforePersister(), 
journal);
                 journal.start(node);
-                DurabilityService durability = node.durability();
-                // TODO (desired): randomise
-                durability.shards().setShardCycleTime(30, SECONDS);
-                durability.global().setGlobalCycleTime(180, SECONDS);
-                durabilityServices.add(durability);
+                durabilityServices.add(node.durability());
                 nodeMap.put(id, node);
                 durabilityServices.add(new DurabilityService(node));
             }
@@ -730,7 +726,7 @@ public class Cluster
             Runnable updateProgressLogConcurrency;
             {
                 updateProgressLogConcurrency = () -> {
-                    nodeMap.values().forEach(node -> 
node.commandStores().forEachCommandStore(cs -> 
((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(1, 
16))));
+                    nodeMap.values().forEach(node -> 
node.commandStores().forEachCommandStore(cs -> 
((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(8, 
32))));
                 };
             }
             updateProgressLogConcurrency.run();
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 4c20684e..515d8104 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongSupplier;
 
@@ -47,6 +48,7 @@ import accord.impl.basic.SimulatedFault;
 import accord.impl.mock.Network;
 import accord.local.Command;
 import accord.local.Node;
+import accord.local.PreLoadContext;
 import accord.local.SafeCommandStore;
 import accord.local.TimeService;
 import accord.messages.ReplyContext;
@@ -290,7 +292,9 @@ public class ListAgent implements InMemoryAgent, 
CoordinatorEventListener
     public AsyncResult<Void> snapshot(InMemoryCommandStore commandStore)
     {
         Snapshotter<Snapshot> snapshotter = 
snapshotters.computeIfAbsent(commandStore.id(), ignore -> new 
Snapshotter<>(scheduler, rnd));
-        return snapshotter.snapshot(false, Snapshot.snapshot(commandStore));
+        return commandStore.submit((PreLoadContext.Empty)() -> "Snapshot", 
safeStore -> snapshotter.snapshot(false, Snapshot.snapshot(commandStore)))
+                           .flatMap(Function.identity())
+                           .beginAsResult();
     }
 
     public void restore(InMemoryCommandStore commandStore)
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 6d1a84d3..e90c205f 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -678,15 +678,15 @@ public class CommandsForKeyTest
                     safeCfk.set(result.cfk());
                     if (rnd.decide(pruneChance))
                         safeCfk.set(safeCfk.current.maybePrune(pruneInterval, 
pruneHlcDelta));
-                    result.postProcess(safeStore, prev, update.next, canon);
+                    result.postProcess(safeStore, prev, update.next, canon, 
false);
                 }
 
                 if (!CommandsForKey.managesExecution(update.next.txnId()) && 
update.next.hasBeen(Status.Stable) && !update.next.hasBeen(Status.Truncated))
                 {
                     CommandsForKey prev = safeCfk.current();
-                    result = prev.registerUnmanaged(safeCommand, REGISTER);
+                    result = prev.registerUnmanaged(safeStore, safeCommand, 
REGISTER);
                     safeCfk.set(result.cfk());
-                    result.postProcess(safeStore, prev, null, canon);
+                    result.postProcess(safeStore, prev, null, canon, false);
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to