This is an automated email from the ASF dual-hosted git repository.

belliottsmith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 66cab49f1c9af471795339a5fdf9522e6e73b691
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Jun 4 19:20:56 2026 +0100

    Expunged records may be resurrected:
      CommandChanges.shouldCleanup short-circuits to NO if there is no data, 
but this is incorrect as Cleanup.EXPUNGE may have dropped the data and the 
record must receive cleanup EXPUNGE and be reported as ERASED.
    Also Fix:
     - ActiveEpochs.withNewEpochs should handle transition from 0 -> more than 1
     - RedundantBefore.minGcBefore should be NONE if empty
     - Update RangesForEpoch directly, so that we cannot have race conditions 
where the ownership is unknown
     - Avoid reentrancy on local callbacks
     - Ensure ReadCoordinator callbacks are invoked on owning thread
     - Avoid deadlock when notifying ComplexListener(s)
     - Release IntrusivePriorityHeap memory from large capacity heaps when empty
     - Prevent SynchronousRecoverAwait reentrancy when invoking onDone (by 
exposing and invoking invokeOnDone that first sets isDone)
     - maybeExecute must invoke either notWaiting or notifyWaiting to ensure 
tryExecuteListening terminates
    
    patch by Benedict; reviewed by Alan Wang and Alex Petrov for CASSANDRA-21440
---
 .../accord/coordinate/AbstractCoordination.java    |   9 +-
 .../main/java/accord/coordinate/ExecuteTxn.java    |  16 +-
 .../java/accord/coordinate/ReadCoordinator.java    |  45 ++-
 .../accord/coordinate/SynchronousRecoverAwait.java |   4 +-
 .../src/main/java/accord/impl/CommandChange.java   |  27 +-
 .../java/accord/impl/DefaultLocalListeners.java    | 133 ++++---
 .../java/accord/impl/InMemoryCommandStore.java     |  21 +-
 .../src/main/java/accord/local/CommandBuilder.java |   2 +
 .../src/main/java/accord/local/CommandStore.java   |  97 +----
 .../src/main/java/accord/local/CommandStores.java  |  66 ++--
 .../src/main/java/accord/local/Commands.java       |   4 +-
 .../main/java/accord/local/RedundantBefore.java    |  15 +-
 .../src/main/java/accord/messages/Callback.java    |  46 ++-
 .../main/java/accord/topology/ActiveEpochs.java    |   9 +-
 .../java/accord/utils/IntrusivePriorityHeap.java   |  23 +-
 .../java/accord/utils/ReducingIntervalMap.java     |   1 +
 .../main/java/accord/utils/ReducingRangeMap.java   |   5 +
 .../test/java/accord/impl/RemoteListenersTest.java |   2 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  16 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  30 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |  35 +-
 .../java/accord/local/MaybeExecuteAdapterTest.java | 408 +++++++++++++++++++++
 .../java/accord/local/cfk/CommandsForKeyTest.java  |   5 +-
 .../src/test/java/accord/utils/AccordGens.java     |   5 +-
 24 files changed, 719 insertions(+), 305 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java 
b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
index 59cbf720..626c64be 100644
--- a/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
+++ b/accord-core/src/main/java/accord/coordinate/AbstractCoordination.java
@@ -65,6 +65,7 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     private BiConsumer<? super Result, Throwable> callback;
     private Object[] replyState;
     private int replyCount;
+    private boolean unsafeToReplyImmediately;
 
     protected AbstractCoordination(Node node, SequentialAsyncExecutor 
executor, TxnId txnId, P scope, SortedArrayList<Node.Id> nodes, BiConsumer<? 
super Result, Throwable> callback)
     {
@@ -197,6 +198,7 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     void contact(Function<Node.Id, Request> request, @Nullable 
Predicate<Node.Id> include)
     {
         executor.executeMaybeImmediately(() -> {
+            unsafeToReplyImmediately = true;
             AbstractTracker<?> tracker = tracker();
             Topologies topologies = tracker.topologies();
             if (tracing != null)
@@ -227,6 +229,7 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
                     }
                 }
             }
+            unsafeToReplyImmediately = false;
         });
     }
 
@@ -242,19 +245,19 @@ public abstract class AbstractCoordination<P extends 
Participants<?>, Result, Re
     @Override
     public final void onSuccess(Node.Id from, Reply reply)
     {
-        CallbackExclusive.onSuccess(executor, this, from, reply);
+        CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, this, 
from, reply);
     }
 
     @Override
     public final void onSlow(Node.Id from)
     {
-        CallbackExclusive.onSlow(executor, this, from);
+        CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, 
from);
     }
 
     @Override
     public final void onFailure(Node.Id from, Throwable failure)
     {
-        CallbackExclusive.onFailure(executor, this, from, failure);
+        CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, this, 
from, failure);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
index 9f813631..b042af07 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java
@@ -86,9 +86,7 @@ import static 
accord.coordinate.CoordinationAdapter.Factory.Kind.Standard;
 import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE;
 import static accord.coordinate.ExecutePath.EPHEMERAL;
 import static accord.coordinate.ExecutePath.FAST;
-import static accord.coordinate.ExecutePath.MEDIUM;
 import static accord.coordinate.ExecutePath.RECOVER;
-import static accord.coordinate.ExecutePath.SLOW;
 import static accord.coordinate.ReadCoordinator.Action.Approve;
 import static accord.coordinate.ReadCoordinator.Action.ApprovePartial;
 import static accord.local.CommandSummaries.SummaryStatus.STABLE;
@@ -124,19 +122,19 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
         @Override
         public final void onSuccess(Node.Id from, ReadReply reply)
         {
-            CallbackExclusive.onSuccess(executor, this, from, reply);
+            CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, 
this, from, reply);
         }
 
         @Override
         public final void onSlow(Node.Id from)
         {
-            CallbackExclusive.onSlow(executor, this, from);
+            CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, 
from);
         }
 
         @Override
         public final void onFailure(Node.Id from, Throwable failure)
         {
-            CallbackExclusive.onFailure(executor, this, from, failure);
+            CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, 
this, from, failure);
         }
 
         @Override
@@ -494,18 +492,18 @@ public class ExecuteTxn extends ReadCoordinator<Result, 
ReadReply>
     }
 
     @Override
-    public void exclusiveOnSlowResponse(Id from)
+    public void onSlowExclusive(Id from)
     {
         // send stable messages to everyone not yet contacted, and then inform 
decided, to avoid unnecessary recoveries
         stable.maybeInformStable();
-        super.exclusiveOnSlowResponse(from);
+        super.onSlowExclusive(from);
     }
 
     @Override
-    public void exclusiveOnFailure(Id from, Throwable failure)
+    public void onFailureExclusive(Id from, Throwable failure)
     {
         if (isPrivilegedVoteCommitting && from.id == node.id().id && 
!isDone()) finishWithFailure(failure);
-        else super.exclusiveOnFailure(from, failure);
+        else super.onFailureExclusive(from, failure);
     }
 
     protected CoordinationAdapter<Result> adapter()
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java 
b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 458ed2d9..f1783b33 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -33,6 +33,7 @@ import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.SequentialAsyncExecutor;
 import accord.messages.Callback;
+import accord.messages.Callback.CallbackExclusive;
 import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.WithQuorum;
@@ -53,7 +54,7 @@ import static accord.utils.Invariants.debug;
 import static accord.utils.Invariants.illegalState;
 
 // TODO (expected): configure the number of initial requests we send
-public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Reply> extends ReadTracker implements Callback<Reply>, 
Coordination
+public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Reply> extends ReadTracker implements Callback<Reply>, 
CallbackExclusive<Reply>, Coordination
 {
     public enum Action
     {
@@ -119,6 +120,7 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
     private BiConsumer<? super Result, Throwable> callback;
     private boolean isDone;
     private Throwable failure;
+    boolean unsafeToReplyImmediately;
 
     protected ReadCoordinator(Node node, SequentialAsyncExecutor executor, 
Topologies topologies, TxnId txnId, Participants<?> participants, BiConsumer<? 
super Result, Throwable> callback)
     {
@@ -142,24 +144,19 @@ public abstract class ReadCoordinator<Result, Reply 
extends accord.messages.Repl
     @Override
     public final void onSuccess(Node.Id from, Reply reply)
     {
-        executor.executeMaybeImmediately(() -> {
-            try { onSuccessExclusive(from, reply); }
-            catch (Throwable t) { exclusiveOnCallbackFailure(from, t); }
-        });
+        CallbackExclusive.onSuccess(executor, unsafeToReplyImmediately, this, 
from, reply);
     }
 
     @Override
     public final void onSlow(Node.Id from)
     {
-        try { exclusiveOnSlowResponse(from); }
-        catch (Throwable t) { exclusiveOnCallbackFailure(from, t); }
+        CallbackExclusive.onSlow(executor, unsafeToReplyImmediately, this, 
from);
     }
 
     @Override
     public final void onFailure(Node.Id from, Throwable failure)
     {
-        try { exclusiveOnFailure(from, failure); }
-        catch (Throwable t) { exclusiveOnCallbackFailure(from, t); }
+        CallbackExclusive.onFailure(executor, unsafeToReplyImmediately, this, 
from, failure);
     }
 
     @Override
@@ -169,7 +166,8 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
         return true;
     }
 
-    protected void onSuccessExclusive(Id from, Reply reply)
+    @Override
+    public void onSuccessExclusive(Id from, Reply reply)
     {
         if (isDone)
             return;
@@ -184,7 +182,7 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
         {
             default: throw new UnhandledEnum(action);
             case Aborted:
-                setDone();
+                trySetDone();
 
             case None:
                 break;
@@ -212,7 +210,8 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
         }
     }
 
-    protected void exclusiveOnSlowResponse(Id from)
+    @Override
+    public void onSlowExclusive(Id from)
     {
         if (isDone)
             return;
@@ -225,11 +224,12 @@ public abstract class ReadCoordinator<Result, Reply 
extends accord.messages.Repl
         }
         catch (Throwable t)
         {
-            exclusiveOnCallbackFailure(from, t);
+            onCallbackFailureExclusive(from, t);
         }
     }
 
-    protected void exclusiveOnFailure(Id from, Throwable failure)
+    @Override
+    public void onFailureExclusive(Id from, Throwable failure)
     {
         if (isDone)
             return;
@@ -250,11 +250,12 @@ public abstract class ReadCoordinator<Result, Reply 
extends accord.messages.Repl
         }
         catch (Throwable t)
         {
-            exclusiveOnCallbackFailure(from, t);
+            onCallbackFailureExclusive(from, t);
         }
     }
 
-    protected void exclusiveOnCallbackFailure(Id from, Throwable failure)
+    @Override
+    public void onCallbackFailureExclusive(Id from, Throwable failure)
     {
         try
         {
@@ -319,12 +320,12 @@ public abstract class ReadCoordinator<Result, Reply 
extends accord.messages.Repl
         finishOnFailure();
     }
 
-    boolean isDone()
+    final boolean isDone()
     {
         return isDone;
     }
 
-    void setDone()
+    final void setDone()
     {
         Invariants.require(!isDone);
         isDone = true;
@@ -333,7 +334,7 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
             Coordination.traceStop(tracing, this);
     }
 
-    boolean trySetDone()
+    final boolean trySetDone()
     {
         if (isDone)
             return false;
@@ -341,7 +342,7 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
         return true;
     }
 
-    private void invokeOnDone(Success success, Throwable failure)
+    final void invokeOnDone(Success success, Throwable failure)
     {
         setDone();
         try { onDone(success, failure); }
@@ -374,7 +375,9 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
 
     protected void start(List<Id> to)
     {
+        unsafeToReplyImmediately = true;
         to.forEach(this::contact);
+        unsafeToReplyImmediately = false;
     }
 
     public final void start()
@@ -407,7 +410,9 @@ public abstract class ReadCoordinator<Result, Reply extends 
accord.messages.Repl
         RequestStatus status = trySendMore(List::add, contact);
         if (tracing != null)
             tracing.trace(null, "contacting %s", contact);
+        unsafeToReplyImmediately = true;
         contact.forEach(this::contact);
+        unsafeToReplyImmediately = false;
         return status;
     }
 
diff --git 
a/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java 
b/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java
index 2a07ced9..33e23c73 100644
--- a/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java
+++ b/accord-core/src/main/java/accord/coordinate/SynchronousRecoverAwait.java
@@ -97,7 +97,7 @@ public class SynchronousRecoverAwait extends 
ReadCoordinator<InferredFastPath, R
 
             case Reject:
                 outcome = Reject;
-                onDone(null, null);
+                invokeOnDone(null, null);
                 return Action.Aborted;
 
             case Accept:
@@ -105,7 +105,7 @@ public class SynchronousRecoverAwait extends 
ReadCoordinator<InferredFastPath, R
                 if (waitingOn.isEmpty())
                 {
                     outcome = Accept;
-                    onDone(null, null);
+                    invokeOnDone(null, null);
                     return Action.Aborted;
                 }
                 return Action.Approve;
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index a3a05960..8729eedb 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -357,11 +357,6 @@ public class CommandChange
 
         public Cleanup shouldCleanup(Input input, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
         {
-            // Early return: No cleanup needed when no updates have been made 
to this command
-            if (!hasUpdate)
-                return NO;
-
-            // Honor previously set cleanup requirements - cleanup levels are 
ordered by aggressiveness
             if (cleanup != null)
             {
                 switch (cleanup)
@@ -381,16 +376,22 @@ public class CommandChange
             Durability durability = this.durability;
             if (durability == null) durability = NotDurable;
             StoreParticipants participants = this.participants;
+
             // TODO (expected): we need to filter participants to correctly 
compute doesStillExecute in Cleanup.shouldCleanup;
             //  would be better to break this dependency, or otherwise encode 
it better.
             //  In particular it would be nice to avoid doing this twice for 
each command on load, as we also do this in SafeCommandStore.
             //  Perhaps we can special-case loading, and simply update the 
participants here so we can avoid doing it again on access
+            SaveStatus saveStatus = this.saveStatus;
             if (input == Input.FULL)
             {
                 // During full compaction, commands without save status can be 
completely expunged
                 if (saveStatus == null)
-                    return EXPUNGE;
-                
+                {
+                    if (hasUpdate)
+                        return EXPUNGE;
+                    saveStatus = SaveStatus.NotDefined;
+                }
+
                 if (participants != null)
                     participants = participants.filter(LOAD, redundantBefore, 
txnId, saveStatus.known.isExecuteAtKnown() ? executeAt : null);
             }
@@ -401,6 +402,7 @@ public class CommandChange
             return cleanup;
         }
 
+        @SuppressWarnings("UnusedReturnValue") // used by implementation
         public Cleanup maybeCleanup(boolean clearFields, Input input, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
         {
             Cleanup cleanup = shouldCleanup(input, redundantBefore, 
durableBefore);
@@ -509,6 +511,7 @@ public class CommandChange
 
         // returns true if we made a material update to the Builder;
         // that is, if we cleared a non-null field or if we are already 
mask-only
+        @SuppressWarnings("UnusedReturnValue") // return value is used by 
implementation
         public boolean clearSuperseded(boolean clearFields, Builder 
superseding)
         {
             int unset = flags & setFieldsMask(superseding.flags & 
~setChanged(CLEANUP));
@@ -597,9 +600,7 @@ public class CommandChange
         // TODO (expected): we shouldn't need to filter participants here, we 
will do it anyway before using in SafeCommandStore
         public Command construct(RedundantBefore redundantBefore)
         {
-            if (!hasUpdate)
-                return null;
-
+            // we cannot short-circuit !hasUpdate, as we might have been 
expunged and should return Erased
             Invariants.require(txnId != null);
             if (cleanup != null)
             {
@@ -610,7 +611,7 @@ public class CommandChange
                 {
                     default: throw new UnhandledEnum(cleanup);
                     case NO: break;
-                    case EXPUNGE: return null;
+                    case EXPUNGE:
                     case ERASE: return Command.Truncated.erased(txnId);
                     case INVALIDATE: return 
Command.Truncated.invalidated(txnId, participants);
                     case VESTIGIAL: return Command.Truncated.vestigial(txnId, 
participants);
@@ -625,6 +626,9 @@ public class CommandChange
                 }
             }
 
+            if (!hasUpdate)
+                return null;
+
             // TODO (expected): bitset of expected known fields for cheap and 
comprehensive expunge check
             if (executeAt == null && saveStatus != null && 
saveStatus.known.isExecuteAtKnown())
                 return null;
@@ -680,7 +684,6 @@ public class CommandChange
                 case Vestigial:
                     return vestigial(txnId, participants);
                 case Erased:
-                    // TODO (expected): why are we saving Durability here for 
erased commands?
                     return erased(txnId);
                 case Invalidated:
                     return invalidated(txnId, participants);
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index ef5fd853..02bd617b 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -42,6 +42,7 @@ import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
+import accord.utils.ArrayBuffers.BufferList;
 import accord.utils.AsymmetricComparator;
 import accord.utils.Invariants;
 import accord.utils.btree.BTree;
@@ -292,7 +293,6 @@ public class DefaultLocalListeners implements LocalListeners
         static final RegisteredComplexListener[] NO_LISTENERS = new 
RegisteredComplexListener[0];
         RegisteredComplexListener[] listeners = NO_LISTENERS;
         int count, length;
-        boolean notifying;
 
         /**
          * Append to the end of the list; if we aren't reentering from notify 
then if the next position
@@ -307,10 +307,11 @@ public class DefaultLocalListeners implements 
LocalListeners
             Invariants.require(listeners[index] == remove);
             listeners[index] = null;
             remove.index = -1;
-            // we don't decrement length even if count==length so as to 
simplify reentry
-            --count;
-            if (Invariants.isParanoid() && !notifying) checkIntegrity();
-            return count > 0 || notifying ? this : null;
+            if (--count == 0)
+                return null;
+
+            if (Invariants.isParanoid()) checkIntegrity();
+            return this;
         }
 
         /**
@@ -323,10 +324,10 @@ public class DefaultLocalListeners implements 
LocalListeners
             if (listeners.length == length)
             {
                 RegisteredComplexListener[] oldListeners = listeners;
-                if (length >= count / 2 || notifying)
+                if (length >= count / 2)
                     listeners = new RegisteredComplexListener[Math.max(2, 
length * 2)];
 
-                if (count == length || notifying)
+                if (count == length)
                 {
                     // copy to same positions
                     System.arraycopy(oldListeners, 0, listeners, 0, length);
@@ -353,7 +354,7 @@ public class DefaultLocalListeners implements LocalListeners
             add.index = length;
             length++;
             count++;
-            if (Invariants.isParanoid() && !notifying) checkIntegrity();
+            if (Invariants.isParanoid()) checkIntegrity();
         }
 
         /**
@@ -362,61 +363,15 @@ public class DefaultLocalListeners implements 
LocalListeners
          * listeners that were present when we started. We compact the 
listener collection as we go, though given
          * reentry there is no guarantee the list at exit is compacted.
          */
-        RegisteredComplexListeners notify(SafeCommandStore safeStore, 
SafeCommand safeCommand, NotifySink notifySink)
+        void collect(List<RegisteredComplexListener> notify)
         {
-            int count = 0;
-            int length = this.length;
-
-            notifying = true;
             for (int i = 0 ; i < length ; ++i)
             {
                 RegisteredComplexListener next = listeners[i];
                 if (next == null) continue;
                 Invariants.require(next.index == i);
-                if (!notifySink.notify(safeStore, safeCommand, 
listeners[i].listener))
-                {
-                    if (next.index >= 0)
-                        --this.count;
-                    next.index = -1;
-                }
-                else if (next.index >= 0) // can be cancelled by notify, 
without notify return false
-                {
-                    Invariants.requireArgument(next.index == i);
-                    if (i != count)
-                    {
-                        listeners[count] = next;
-                        next.index = count;
-                    }
-                    ++count;
-                }
-                else Invariants.require(listeners[i] == null);
-            }
-            notifying = false;
-
-            if (length != this.length)
-            {
-                // we have had some concurrent insertions (concurrent removals 
do not alter length)
-                // we also have some empty slots, so compact the new entries
-                for (int i = length ; i < this.length ; ++i)
-                {
-                    RegisteredComplexListener next = listeners[i];
-                    if (next == null)
-                        continue;
-
-                    Invariants.require(next.index == i);
-                    listeners[count] = next;
-                    next.index = count;
-                    count++;
-                }
-                Invariants.require(this.count <= count); // we could have 
already removed some items from the compacted section
-                length = this.length;
+                notify.add(listeners[i]);
             }
-
-            Arrays.fill(listeners, count, length, null);
-            this.length = count;
-
-            if (Invariants.isParanoid()) checkIntegrity();
-            return count == 0 ? null : this;
         }
 
         private void checkIntegrity()
@@ -520,13 +475,59 @@ public class DefaultLocalListeners implements 
LocalListeners
         this.txnListeners = txnListeners;
     }
 
-    private void notifyComplexListeners(SafeCommandStore safeStore, 
SafeCommand safeCommand)
+    static class NotifyComplex extends BufferList<RegisteredComplexListener> 
implements BiFunction<TxnId, RegisteredComplexListeners, 
RegisteredComplexListeners>
     {
-        complexListeners.compute(safeCommand.txnId(), (id, cur) -> {
+        boolean remove;
+        @Override
+        public RegisteredComplexListeners apply(TxnId txnId, 
RegisteredComplexListeners cur)
+        {
             if (cur == null)
                 return null;
-            return cur.notify(safeStore, safeCommand, notifySink);
-        });
+
+            if (remove)
+            {
+                for (RegisteredComplexListener listener : this)
+                {
+                    if (listener != null && null == cur.remove(listener))
+                        return null; // can only return early if remaining 
listeners already removed
+                }
+            }
+            else
+            {
+                cur.collect(this);
+            }
+
+            return cur;
+        }
+    }
+
+    private void notifyComplexListeners(SafeCommandStore safeStore, 
SafeCommand safeCommand)
+    {
+        try (NotifyComplex notify = new NotifyComplex())
+        {
+            complexListeners.compute(safeCommand.txnId(), notify);
+
+            int size = notify.size(), count = size;
+            for (int i = 0 ; i < size ; ++i)
+            {
+                RegisteredComplexListener registered = notify.get(i);
+                if (registered.index < 0) continue;
+                boolean noChange = notifySink.notify(safeStore, safeCommand, 
registered.listener)
+                                   || registered.index < 0; // check we 
haven't removed ourselves during notify, to avoid wasted work
+
+                if (noChange)
+                {
+                    notify.set(i, null);
+                    --count;
+                }
+            }
+
+            if (count > 0)
+            {
+                notify.remove = true;
+                complexListeners.compute(safeCommand.txnId(), notify);
+            }
+        }
     }
 
     @Override
@@ -555,9 +556,9 @@ public class DefaultLocalListeners implements LocalListeners
         complexListeners.forEach((key, value) -> {
             // the listener registration needs to be invalidated so that a 
caller does not try to cancel it
             RegisteredComplexListeners listeners = 
complexListeners.remove(key);
-            for (int i = 0 ; i < listeners.length ; i++)
+            if (listeners != null)
             {
-                if (listeners.listeners[i] != null)
+                for (int i = 0 ; i < listeners.count ; i++)
                     listeners.listeners[i].index = -1;
             }
         });
@@ -716,13 +717,7 @@ public class DefaultLocalListeners implements 
LocalListeners
                                 maxBufferCount = bufferCount;
                             }
 
-                            int count = 0;
-                            for (int i = 0 ; i < cur.length ; ++i)
-                            {
-                                if (cur.listeners[i] != null)
-                                    buffer[count++] = cur.listeners[i];
-                            }
-                            Invariants.expect(count == bufferCount);
+                            System.arraycopy(cur.listeners, 0, buffer, 0, 
bufferCount);
                             return cur;
                         });
 
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index daa2c9f5..41be863d 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -41,6 +41,7 @@ import java.util.function.Predicate;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.local.CommandStores.RangesForEpoch;
 import accord.local.cfk.NotifySink;
 import accord.primitives.*;
 import com.google.common.annotations.VisibleForTesting;
@@ -216,9 +217,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     private InMemorySafeStore current;
     private final Journal journal;
 
-    public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, 
Journal journal)
+    public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, Journal 
journal)
     {
-        super(id, node, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
+        super(id, node, agent, store, progressLogFactory, listenersFactory, 
rangesForEpoch);
         this.journal = journal;
         this.commandsForRanges = new InMemoryRangeSummaryIndex();
         progressLog.unsafeStart();
@@ -475,9 +476,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     {
         if (current != null)
             throw illegalState("Another operation is in progress or it's store 
was not cleared");
-        current = createSafeStore(context, cfrLoad);
-        updateRangesForEpoch(current);
-        return current;
+        return current = createSafeStore(context, cfrLoad);
     }
 
     public void completeOperation(SafeCommandStore store)
@@ -951,9 +950,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         Thread activeThread;
         final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
-        public Synchronized(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
+        public Synchronized(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, RangesForEpoch rangesForEpoch, Journal journal)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, journal);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, journal);
         }
 
         private synchronized void maybeRun()
@@ -1026,9 +1025,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         private Thread thread; // when run in the executor this will be 
non-null, null implies not running in this store
         private final ExecutorService executor;
 
-        public SingleThread(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
+        public SingleThread(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, RangesForEpoch rangesForEpoch, Journal journal)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, journal);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, journal);
             this.executor = Executors.newSingleThreadExecutor(r -> {
                 Thread thread = new Thread(r);
                 thread.setName(CommandStore.class.getSimpleName() + '[' + 
time.id() + ']');
@@ -1110,9 +1109,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             }
         }
 
-        public Debug(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, EpochUpdateHolder epochUpdateHolder, Journal journal)
+        public Debug(int id, NodeCommandStoreService time, Agent agent, 
DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, RangesForEpoch rangesForEpoch, Journal journal)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, journal);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, journal);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/local/CommandBuilder.java 
b/accord-core/src/main/java/accord/local/CommandBuilder.java
index ecfc8cdd..44c71ac3 100644
--- a/accord-core/src/main/java/accord/local/CommandBuilder.java
+++ b/accord-core/src/main/java/accord/local/CommandBuilder.java
@@ -229,6 +229,8 @@ public class CommandBuilder
             case TruncatedApplyWithOutcome:
             case TruncatedApply:
             case TruncatedUnapplied:
+                if (txnId.awaitsOnlyDeps())
+                    return Command.Truncated.truncated(txnId, saveStatus, 
durability, participants, executeAt, partialDeps, writes, result, null);
                 return Command.Truncated.truncated(txnId, saveStatus, 
durability, participants, executeAt, partialDeps, writes, result);
             case Erased:
                 return Command.Truncated.erased(txnId);
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index d6a99a42..a07a6559 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -29,7 +29,6 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -104,43 +103,6 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CommandStore.class);
 
-    public static class EpochUpdate
-    {
-        public final RangesForEpoch newRangesForEpoch;
-        public final RedundantBefore addRedundantBefore;
-
-        EpochUpdate(RangesForEpoch newRangesForEpoch, RedundantBefore 
addRedundantBefore)
-        {
-            this.newRangesForEpoch = newRangesForEpoch;
-            this.addRedundantBefore = addRedundantBefore;
-        }
-    }
-
-    // TODO (required): we only REMOVE ranges now, so it should be possible to 
simplify this
-    public static class EpochUpdateHolder extends AtomicReference<EpochUpdate>
-    {
-        // TODO (desired): can better encapsulate by accepting only the 
newRangesForEpoch and deriving the add/remove ranges
-        public void add(long epoch, RangesForEpoch newRangesForEpoch, Ranges 
addRanges)
-        {
-            RedundantBefore addRedundantBefore = 
RedundantBefore.create(addRanges, epoch, Long.MAX_VALUE, 
TxnId.minForEpoch(epoch), UNREADY_ONLY);
-            update(newRangesForEpoch, addRedundantBefore);
-        }
-
-        public void remove(long epoch, RangesForEpoch newRangesForEpoch, 
Ranges removeRanges)
-        {
-            RedundantBefore addRedundantBefore = 
RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, 
SomeStatus.NONE);
-            update(newRangesForEpoch, addRedundantBefore);
-        }
-
-        private void update(RangesForEpoch newRangesForEpoch, RedundantBefore 
addRedundantBefore)
-        {
-            EpochUpdate baseUpdate = new EpochUpdate(newRangesForEpoch, 
addRedundantBefore);
-            EpochUpdate cur = get();
-            if (cur == null || !compareAndSet(cur, new 
EpochUpdate(newRangesForEpoch, RedundantBefore.merge(cur.addRedundantBefore, 
addRedundantBefore))))
-                set(baseUpdate);
-        }
-    }
-
     public interface Factory
     {
         CommandStore create(int id,
@@ -149,7 +111,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
                             DataStore store,
                             ProgressLog.Factory progressLogFactory,
                             LocalListeners.Factory listenersFactory,
-                            EpochUpdateHolder rangesForEpoch,
+                            RangesForEpoch rangesForEpoch,
                             Journal journal);
     }
 
@@ -159,7 +121,6 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
     protected final DataStore dataStore;
     protected final ProgressLog progressLog;
     protected final LocalListeners listeners;
-    protected final EpochUpdateHolder epochUpdateHolder;
 
     // Used in markShardStale to make sure the staleness includes in progress 
bootstraps
     // TODO (desired): migrate to BTree
@@ -214,7 +175,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
                            DataStore dataStore,
                            ProgressLog.Factory progressLogFactory,
                            LocalListeners.Factory listenersFactory,
-                           EpochUpdateHolder epochUpdateHolder)
+                           RangesForEpoch rangesForEpoch)
     {
         this.id = id;
         this.node = node;
@@ -222,7 +183,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         this.dataStore = dataStore;
         this.progressLog = progressLogFactory.create(this);
         this.listeners = listenersFactory.create(this);
-        this.epochUpdateHolder = epochUpdateHolder;
+        loadRangesForEpoch(rangesForEpoch);
     }
 
     public final int id()
@@ -255,34 +216,6 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         waitingOnVisibility.clear();
     }
 
-    public void updateRangesForEpoch(SafeCommandStore safeStore)
-    {
-        EpochUpdate update = epochUpdateHolder.get();
-        if (update == null)
-            return;
-
-        update = epochUpdateHolder.getAndSet(null);
-        if (update.addRedundantBefore.size() > 0)
-            safeStore.upsertRedundantBefore(update.addRedundantBefore);
-        if (update.newRangesForEpoch != null)
-            safeStore.setRangesForEpoch(update.newRangesForEpoch);
-
-        safeStore.persistFieldUpdates();
-    }
-
-    @VisibleForTesting
-    public void unsafeUpdateRangesForEpoch()
-    {
-        EpochUpdate update = epochUpdateHolder.getAndSet(null);
-        if (update == null)
-            return;
-
-        if (update.addRedundantBefore.size() > 0)
-            unsafeUpsertRedundantBefore(update.addRedundantBefore);
-        if (update.newRangesForEpoch != null)
-            unsafeSetRangesForEpoch(update.newRangesForEpoch);
-    }
-
     public RangesForEpoch unsafeGetRangesForEpoch()
     {
         return rangesForEpoch;
@@ -304,15 +237,15 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         rangesForEpoch = nonNull(newRangesForEpoch);
     }
 
-    protected final void unsafeClearRangesForEpoch()
-    {
-        rangesForEpoch = null;
-    }
-
     protected void loadRangesForEpoch(RangesForEpoch newRangesForEpoch)
     {
-        Invariants.require(this.rangesForEpoch == null);
+        Invariants.require(this.rangesForEpoch == null || 
rangesForEpoch.isPrefixOf(newRangesForEpoch));
         unsafeSetRangesForEpoch(newRangesForEpoch);
+        if (redundantBefore.isEmpty() && newRangesForEpoch.size() > 0)
+        {
+            long minEpoch = rangesForEpoch.epochAtIndex(0);
+            loadRedundantBefore(RedundantBefore.create(rangesForEpoch.all(), 
minEpoch, Long.MAX_VALUE, TxnId.minForEpoch(minEpoch), UNREADY_ONLY));
+        }
     }
 
     protected final void unsafeClearPermanentlyUnsafeToRead()
@@ -390,8 +323,8 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
 
     protected void loadRedundantBefore(RedundantBefore newRedundantBefore)
     {
-        Invariants.require(redundantBefore == null || 
redundantBefore.equals(RedundantBefore.EMPTY));
-        Invariants.require(newRedundantBefore != null);
+        Invariants.require(redundantBefore == null || 
redundantBefore.foldl((b, v) -> v && b.bounds.length == 1 && b.status(0) == 0 
&& b.status(1) == UNREADY_ONLY.encoded, true));
+        Invariants.require(newRedundantBefore != null && (redundantBefore == 
null || newRedundantBefore.isAtLeast(redundantBefore)));
         unsafeSetRedundantBefore(newRedundantBefore);
     }
 
@@ -850,16 +783,20 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
             });
     }
 
-    Supplier<EpochReady> unbootstrap(long epoch, Ranges removedRanges)
+    Supplier<EpochReady> unbootstrap(long epoch, RangesForEpoch 
newRangesForEpoch, Ranges removeRanges)
     {
         return () -> {
             AsyncResult<Void> done = submit((Empty) () -> "Unbootstrap", 
safeStore -> {
                 for (Bootstrap prev : bootstraps)
                 {
-                    Ranges abort = prev.allValid.slice(removedRanges, Minimal);
+                    Ranges abort = prev.allValid.slice(removeRanges, Minimal);
                     if (!abort.isEmpty())
                         prev.invalidate(abort);
                 }
+                
Invariants.require(rangesForEpoch.isPrefixOf(newRangesForEpoch));
+                RedundantBefore addRedundantBefore = 
RedundantBefore.create(removeRanges, Long.MIN_VALUE, epoch, TxnId.NONE, 
SomeStatus.NONE);
+                safeStore.setRangesForEpoch(newRangesForEpoch);
+                safeStore.upsertRedundantBefore(addRedundantBefore);
                 return null;
             });
 
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index dd2e353d..168b7df4 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -48,7 +48,6 @@ import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
-import accord.local.CommandStore.EpochUpdateHolder;
 import accord.primitives.AbstractRanges;
 import accord.primitives.AbstractUnseekableKeys;
 import accord.primitives.EpochSupplier;
@@ -127,7 +126,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
          */
         Ranges ranges(ShardHolder shard);
 
-        default @Nullable long minEpoch() { return -1L; };
+        default long minEpoch() { return -1L; };
     }
 
     public interface UnrestrictedStoreSelector extends StoreSelector
@@ -354,7 +353,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             this.journal = journal;
         }
 
-        CommandStore create(int id, EpochUpdateHolder rangesForEpoch)
+        CommandStore create(int id, RangesForEpoch rangesForEpoch)
         {
             return shardFactory.create(id, node, agent, this.store, 
progressLogFactory, listenersFactory, rangesForEpoch, journal);
         }
@@ -721,6 +720,20 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             }
             return removed;
         }
+
+        public boolean isPrefixOf(RangesForEpoch that)
+        {
+            if (this.size() > that.size())
+                return false;
+
+            for (int i = 0 ; i < size() ; ++i)
+            {
+                if (this.epochs[i] != that.epochs[i] || 
!this.ranges[i].equals(that.ranges[i]))
+                    return false;
+            }
+
+            return true;
+        }
     }
 
     protected void loadSnapshot(Snapshot toLoad)
@@ -927,9 +940,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             {
                 // TODO (required): This is updating the a non-volatile field 
in the previous Snapshot, why modify it at all, even with volatile the 
guaranteed visibility is weak even with mutual exclusion
                 shard.ranges = shard.ranges().withRanges(newTopology.epoch(), 
current.without(subtracted));
-                shard.store.epochUpdateHolder.remove(epoch, shard.ranges, 
removeRanges);
-
-                bootstrapUpdates.add(shard.store.unbootstrap(epoch, 
removeRanges));
+                bootstrapUpdates.add(shard.store.unbootstrap(epoch, 
shard.ranges, removeRanges));
             }
 
             Ranges regainedRanges = shard.ranges().all().slice(added, Minimal);
@@ -953,11 +964,12 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             logger.info("Epoch {} adding {} to local command stores", epoch, 
added);
             for (Ranges addRanges : shardDistributor.split(added))
             {
-                EpochUpdateHolder updateHolder = new EpochUpdateHolder();
                 RangesForEpoch rangesForEpoch = new RangesForEpoch(epoch, 
addRanges);
-                updateHolder.add(epoch, rangesForEpoch, addRanges);
-                ShardHolder shard = new ShardHolder(supplier.create(nextId++, 
updateHolder), previouslyOwned.regains(addRanges));
+                ShardHolder shard = new ShardHolder(supplier.create(nextId++, 
rangesForEpoch), previouslyOwned.regains(addRanges));
                 shard.ranges = rangesForEpoch;
+                bootstrapUpdates.add(() -> EpochReady.all(epoch, 
shard.store.execute((PreLoadContext.Empty)() -> "Saving RangesForEpoch to 
journal for " + shard.store, safeStore -> {
+                    safeStore.setRangesForEpoch(rangesForEpoch); // to persist 
it
+                })));
 
                 Map<BootstrapRangeAction, Ranges> partitioned = 
addRanges.partitioningBy(range -> shouldBootstrap(node, prev.global, 
newLocalTopology, range), BootstrapRangeAction.class);
                 for (Map.Entry<BootstrapRangeAction, Ranges> entry : 
partitioned.entrySet())
@@ -1176,11 +1188,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         {
             RangesForEpoch rfe = e.getValue();
             Invariants.require(rfe != null);
-            EpochUpdateHolder holder = new EpochUpdateHolder();
-            ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), 
holder), rfe, update.previouslyOwned.regains(rfe.currentRanges()));
-            // TODO (required): if the add is necessary (highly unlikely) it 
needs to be done once journal is writeable so we NEED to move this
-            if (!shard.ranges.equals(shard.store.rangesForEpoch))
-                holder.add(1, e.getValue(), rfe.all());
+            ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), 
rfe), rfe, update.previouslyOwned.regains(rfe.currentRanges()));
             maxId = Math.max(maxId, e.getKey());
             shards[i++] = shard;
         }
@@ -1202,35 +1210,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
             RangesForEpoch rfe = e.getValue();
             Invariants.require(rfe != null);
             ShardHolder shard = new ShardHolder(current.byId(storeId), rfe, 
update.previouslyOwned.regains(rfe.all()));
-            EpochUpdateHolder holder = shard.store.epochUpdateHolder;
-            rfe.forEach(new BiConsumer<>()
-            {
-                RangesForEpoch accumulator = null;
-                Ranges prev = null;
-                public void accept(Long epoch, Ranges ranges)
-                {
-                    if (accumulator == null)
-                        accumulator = new RangesForEpoch(epoch, ranges);
-                    else
-                        accumulator = accumulator.withRanges(epoch, ranges);
-
-                    Ranges additions = ranges;
-                    Ranges removals = Ranges.EMPTY;
-                    if (prev != null)
-                    {
-                        additions = ranges.without(prev);
-                        removals = prev.without(ranges);
-                    }
-
-                    if (!additions.isEmpty())
-                        holder.add(epoch, accumulator, additions);
-                    if (!removals.isEmpty())
-                        holder.remove(epoch, accumulator, removals);
-                    shard.store.unsafeUpdateRangesForEpoch();
-                    prev = ranges;
-                }
-            });
-
+            shard.store.unsafeSetRangesForEpoch(rfe);
             shards[storeId] = shard;
             maxId = Math.max(maxId, storeId);
         }
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 3bfc31a9..78fa9f5f 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -838,13 +838,15 @@ public class Commands
         WaitingOn waitingOn = command.waitingOn();
         if (waitingOn.isWaiting())
         {
-            if (!removeRedundantDependencies(safeStore, safeCommand) || 
safeCommand.current().waitingOn().isWaiting())
+            if (!removeRedundantDependencies(safeStore, safeCommand) || 
(waitingOn = safeCommand.current().waitingOn()).isWaiting())
             {
                 if (alwaysNotifyListeners)
                     safeStore.notifyListeners(safeCommand, command);
 
                 if (notifyWaitingOn && waitingOn.isWaitingOnCommand())
                     adapter.notifyWaiting(safeStore, safeCommand);
+                else
+                    adapter.notWaiting(safeStore);
 
                 return false;
             }
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 56f35923..3cc16d08 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -87,6 +87,7 @@ import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
 import static accord.utils.ArrayBuffers.cachedAny;
 import static accord.utils.ArrayBuffers.cachedInts;
 import static accord.utils.Functions.alwaysFalse;
+import static accord.utils.Functions.alwaysTrue;
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.require;
 import static accord.utils.Invariants.requireStrictlyOrdered;
@@ -874,7 +875,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
     {
         staleRanges = lostRanges = Ranges.EMPTY;
         maxStale = maxShardAppliedBefore = maxGcBefore = TxnId.NONE;
-        minShardAndLocallyAppliedBefore = minGcBefore = TxnId.MAX;
+        minShardAndLocallyAppliedBefore = minGcBefore = TxnId.NONE;
         minGcHlcBefore = 0L;
         maxStartEpoch = 0;
         minEndEpoch = Long.MAX_VALUE;
@@ -920,6 +921,8 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
             if (bounds.endEpoch < minEndEpoch)
                 minEndEpoch = bounds.endEpoch;
         }
+
+        Invariants.require(minGcHlcBefore < Long.MAX_VALUE);
         this.maxStale = maxUnready;
         this.maxShardAppliedBefore = maxShardAppliedBefore;
         this.maxGcBefore = maxGcBefore;
@@ -1129,8 +1132,14 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
 
     public boolean isAtLeast(RedundantBefore atLeast)
     {
+        if (!ranges().containsAll(atLeast.ranges()))
+            return false;
+
         return foldl((b, v, al, e) -> {
             return al.foldl(Ranges.of(b.range), (lb, v2, ub) -> {
+                if (!v2 || ub.endEpoch > lb.endEpoch || ub.startEpoch != 
lb.startEpoch)
+                    return false;
+
                 int j = 0;
                 for (int i = 0 ; i < lb.bounds.length ; ++i)
                 {
@@ -1142,9 +1151,9 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
                         || (lb.status(i*2+1) & ~ub.status(j*2+1)) != 0)
                         return false;
                 }
-                return v2;
+                return true;
             }, v, b);
-        }, true, atLeast, null, Functions.alwaysFalse());
+        }, true, atLeast, null, i -> !i);
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/messages/Callback.java 
b/accord-core/src/main/java/accord/messages/Callback.java
index dd30288f..ca9b5230 100644
--- a/accord-core/src/main/java/accord/messages/Callback.java
+++ b/accord-core/src/main/java/accord/messages/Callback.java
@@ -39,17 +39,29 @@ public interface Callback<R>
 
     interface CallbackExclusive<R>
     {
-        static <R> void onSuccess(AsyncExecutor executor, CallbackExclusive<R> 
callback, Node.Id from, R success)
+        private static void replyMaybeImmediately(AsyncExecutor executor, 
boolean doNotReplyImmediately, Runnable run)
         {
-            executor.executeMaybeImmediately(() -> {
+            if (doNotReplyImmediately || !executor.tryExecuteImmediately(run))
+                executor.execute(run);
+        }
+
+        static <R> Runnable runOnSuccess(CallbackExclusive<R> callback, 
Node.Id from, R success)
+        {
+            return () -> {
                 try { callback.onSuccessExclusive(from, success); }
                 catch (Throwable t) { 
callback.onCallbackFailureExclusive(from, t); }
-            });
+            };
         }
 
-        static <R> void onFailure(AsyncExecutor executor, CallbackExclusive<R> 
callback, Node.Id from, Throwable failure)
+        static <R> void onSuccess(AsyncExecutor executor, boolean 
doNotReplyImmediately, CallbackExclusive<R> callback, Node.Id from, R success)
+        {
+            replyMaybeImmediately(executor, doNotReplyImmediately, 
runOnSuccess(callback, from, success));
+        }
+
+
+        static <R> Runnable runOnFailure(CallbackExclusive<R> callback, 
Node.Id from, Throwable failure)
         {
-            executor.executeMaybeImmediately(() -> {
+            return () -> {
                 try { callback.onFailureExclusive(from, failure); }
                 catch (Throwable t)
                 {
@@ -60,15 +72,25 @@ public interface Callback<R>
                     }
                     callback.onCallbackFailureExclusive(from, t);
                 }
-            });
+            };
         }
 
-        static <R> void onSlow(AsyncExecutor executor, CallbackExclusive<R> 
callback, Node.Id from)
+        static <R> void onFailure(AsyncExecutor executor, boolean 
doNotReplyImmediately, CallbackExclusive<R> callback, Node.Id from, Throwable 
failure)
         {
-            executor.executeMaybeImmediately(() -> {
+            replyMaybeImmediately(executor, doNotReplyImmediately, 
runOnFailure(callback, from, failure));
+        }
+
+        static <R> Runnable runOnSlow(CallbackExclusive<R> callback, Node.Id 
from)
+        {
+            return () -> {
                 try { callback.onSlowExclusive(from); }
                 catch (Throwable t) { 
callback.onCallbackFailureExclusive(from, t); }
-            });
+            };
+        }
+
+        static <R> void onSlow(AsyncExecutor executor, boolean 
unsafeToReplyImmediately, CallbackExclusive<R> callback, Node.Id from)
+        {
+            replyMaybeImmediately(executor, unsafeToReplyImmediately, 
runOnSlow(callback, from));
         }
 
         void onSuccessExclusive(Node.Id from, R reply);
@@ -89,19 +111,19 @@ public interface Callback<R>
         @Override
         public final void onSuccess(Node.Id from, R reply)
         {
-            CallbackExclusive.onSuccess(executor, this, from, reply);
+            CallbackExclusive.onSuccess(executor, true, this, from, reply);
         }
 
         @Override
         public final void onSlow(Node.Id from)
         {
-            CallbackExclusive.onSlow(executor, this, from);
+            CallbackExclusive.onSlow(executor, true, this, from);
         }
 
         @Override
         public final void onFailure(Node.Id from, Throwable failure)
         {
-            CallbackExclusive.onFailure(executor, this, from, failure);
+            CallbackExclusive.onFailure(executor, true, this, from, failure);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/topology/ActiveEpochs.java 
b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
index 228c7f9b..0831536d 100644
--- a/accord-core/src/main/java/accord/topology/ActiveEpochs.java
+++ b/accord-core/src/main/java/accord/topology/ActiveEpochs.java
@@ -79,10 +79,13 @@ public final class ActiveEpochs implements 
Iterable<ActiveEpoch>
     ActiveEpochs withNewEpochs(ActiveEpoch[] epochs)
     {
         long firstNonEmptyEpoch = this.firstNonEmptyEpoch;
-        if (firstNonEmptyEpoch == -1 && epochs.length > 0 && 
!epochs[0].all().isEmpty())
+        if (firstNonEmptyEpoch == -1)
         {
-            Invariants.require(epochs.length == 1);
-            firstNonEmptyEpoch = epochs[0].epoch();
+            for (int i = epochs.length - 1; firstNonEmptyEpoch == -1 && i >= 0 
; --i)
+            {
+                if (!epochs[i].all().isEmpty())
+                    firstNonEmptyEpoch = epochs[i].epoch();
+            }
         }
         return new ActiveEpochs(manager, epochs, firstNonEmptyEpoch);
     }
diff --git a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java 
b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
index 30ce2973..de000b48 100644
--- a/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
+++ b/accord-core/src/main/java/accord/utils/IntrusivePriorityHeap.java
@@ -33,6 +33,7 @@ import java.util.stream.Stream;
 public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node> implements Comparator<N>
 {
     private static final int NORMAL_MIN_SIZE = 8;
+    private static final int MAX_EMPTY_SIZE = 1024;
     private static final Node[] EMPTY = new Node[0];
     private static final Node[] TINY_EMPTY = new Node[0];
 
@@ -146,7 +147,11 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
                 tail.setHeapIndex(i);
             }
         }
-        else size = heapifiedSize = 0;
+        else
+        {
+            size = heapifiedSize = 0;
+            maybeShrink();
+        }
 
         heap[size] = null;
         node.setHeapIndex(-1);
@@ -187,7 +192,8 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
         --heapifiedSize;
         if (size == 0)
         {
-            heap[0] = null;
+            if (!maybeShrink())
+                heap[0] = null;
             return;
         }
 
@@ -196,6 +202,15 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
         siftDown(siftDown, 0);
     }
 
+    private boolean maybeShrink()
+    {
+        if (heap.length <= MAX_EMPTY_SIZE)
+            return false;
+
+        heap = new Node[MAX_EMPTY_SIZE];
+        return true;
+    }
+
     /**
      * {@code i} is a free position in the heap, siftDown must be safely 
inserted at a position >= i
      */
@@ -303,6 +318,7 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
     {
         Arrays.fill(heap, 0, size, null);
         heapifiedSize = size = 0;
+        maybeShrink();
     }
 
     protected <P> void drain(P param, BiConsumer<P, N> consumer)
@@ -315,6 +331,7 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
         }
         Arrays.fill(heap, 0, size, null);
         heapifiedSize = size = 0;
+        maybeShrink();
     }
 
     /**
@@ -343,6 +360,8 @@ public abstract class IntrusivePriorityHeap<N extends 
IntrusivePriorityHeap.Node
         {
             Arrays.fill(heap, size - removedCount, size, null);
             size -= removedCount;
+            if (size == 0)
+                maybeShrink();
         }
     }
 
diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java 
b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
index 9bf506ff..1cc17d39 100644
--- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java
@@ -70,6 +70,7 @@ public class ReducingIntervalMap<K extends Comparable<? super 
K>, V>
     ReducingIntervalMap(K[] starts, V[] values)
     {
         Invariants.requireArgument(starts.length == values.length + 1 || 
(starts.length + values.length) == 0);
+        Invariants.require(values.length != 1 || values[0] != null);
         this.starts = starts;
         this.values = values;
     }
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 3ae1ab0a..7b3135d5 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -502,6 +502,11 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         }
     }
 
+    public Ranges ranges()
+    {
+        return ranges(Objects::nonNull);
+    }
+
     public Ranges ranges(Predicate<V> include)
     {
         Range[] ranges = new Range[values.length];
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 17c785d8..85ccedc3 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -399,7 +399,7 @@ public class RemoteListenersTest
                   ignore -> new ProgressLog.NoOpProgressLog(),
                   ignore -> new DefaultLocalListeners(null, new 
DefaultRemoteListeners((a, b, c, d, e)->{}),
                                                       
DefaultLocalListeners.DefaultNotifySink.INSTANCE),
-                  new EpochUpdateHolder());
+                  CommandStores.RangesForEpoch.EMPTY);
             this.storeId = id;
         }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index ce54760e..43cf74c8 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -441,7 +441,7 @@ public class Cluster
         return stats;
     }
 
-    static class RandomLoader
+    public static class RandomLoader
     {
         private final BooleanSupplier cacheEmptyChance;
         private final BooleanSupplier cacheFullChance;
@@ -451,22 +451,27 @@ public class Cluster
 
         final BooleanSupplier cmdCheckChance;
         final BooleanSupplier cfkCheckChance;
+
+        public static float CMD_BASE_CHECK_CHANCE = 0.01f;
         static int cmdCounter, cfkCounter;
 
         RandomLoader(RandomSource random)
         {
-            this(random.nextBoolean() ? 1.0f : random.nextFloat(), random);
+            this(random.nextBoolean() ? 1.0f : random.nextFloat(),
+                 random.nextFloat() * CMD_BASE_CHECK_CHANCE,
+                 random.nextFloat() * 0.2f,
+                 random);
         }
 
-        RandomLoader(float presentChance, RandomSource random)
+        RandomLoader(float presentChance, float cmdCheckChance, float 
cfkCheckChance, RandomSource random)
         {
             this(Gens.supplier(Gens.bools().mixedDistribution().next(random), 
random),
                  Gens.supplier(Gens.bools().mixedDistribution().next(random), 
random),
                  random.biasedUniformBools(presentChance),
                  random.biasedUniformBools(presentChance),
                  random.biasedUniformBools(presentChance),
-                 Invariants.testParanoia(LINEAR, LINEAR, HIGH) ? 
Gens.supplier(Gens.bools().mixedDistribution().next(random), random) : () -> 
random.decide(0.001f),
-                 () -> random.decide(0.1f)
+                 random.biasedUniformBools(cmdCheckChance),
+                 random.biasedUniformBools(cfkCheckChance)
             );
         }
 
@@ -578,7 +583,6 @@ public class Cluster
                 }
             };
         }
-
     }
 
     public static Map<MessageType, Stats> run(Id[] nodes, int[] prefixes, 
MessageListener messageListener, Supplier<PendingQueue> queueSupplier,
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index b80b9824..bdc52ee0 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -107,6 +107,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
                                ranges, Arrays.toString(shards))
                       .restore();
             
Invariants.require(previouslyOwned.regains(ranges.currentRanges()).equals(regainingRanges));
+            commandStore.loadRangesForEpoch(ranges);
             ShardHolder shard = new ShardHolder(commandStore, ranges, 
previouslyOwned.regains(ranges.currentRanges()));
             shards[i++] = shard;
         }
@@ -224,26 +225,21 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         private Task<?> active;
         private Thread activeThread;
 
-        public DelayedCommandStore(int id, NodeCommandStoreService time, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder, 
SimulatedDelayedExecutorService executor, CacheLoading cacheLoading, Journal 
journal)
+        public DelayedCommandStore(int id, NodeCommandStoreService time, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, RangesForEpoch rangesForEpoch, 
SimulatedDelayedExecutorService executor, CacheLoading cacheLoading, Journal 
journal)
         {
-            super(id, time, agent, store, progressLogFactory, 
listenersFactory, epochUpdateHolder, journal);
+            super(id, time, agent, store, progressLogFactory, 
listenersFactory, rangesForEpoch, journal);
             this.executor = executor;
             this.cacheLoading = cacheLoading;
             this.journal = journal;
             restore();
         }
 
-        protected void loadRedundantBefore(RedundantBefore redundantBefore)
+        protected void loadRedundantBefore(RedundantBefore newRedundantBefore)
         {
-            if (redundantBefore == null)
-            {
-                Invariants.require(unsafeGetRedundantBefore().size() == 0);
-            }
-            else
-            {
-                unsafeClearRedundantBefore();
-                super.loadRedundantBefore(redundantBefore);
-            }
+            if (newRedundantBefore == null || newRedundantBefore.isEmpty())
+                return;
+
+            super.loadRedundantBefore(newRedundantBefore);
         }
 
         protected void loadBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
bootstrapBeganAt)
@@ -263,12 +259,10 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         @Override
         protected void loadRangesForEpoch(RangesForEpoch newRangesForEpoch)
         {
-            if (newRangesForEpoch == null) 
Invariants.require(super.rangesForEpoch == null);
-            else
-            {
-                unsafeClearRangesForEpoch();
-                super.loadRangesForEpoch(newRangesForEpoch);
-            }
+            if (newRangesForEpoch == null)
+                return;
+
+            super.loadRangesForEpoch(newRangesForEpoch);
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index a4aa44f1..394cd12e 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -217,9 +217,6 @@ public class InMemoryJournal implements Journal
 
     private Builder reconstruct(Diffs files, Load load)
     {
-        if (files == null)
-            return null;
-
         Builder builder = null;
         List<Diff> saved = files.sorted(false);
         for (int i = saved.size() - 1; i >= 0; i--)
@@ -500,7 +497,19 @@ public class InMemoryJournal implements Journal
             for (Map.Entry<TxnId, Diffs> e2 : localJournal.entrySet())
             {
                 Diffs diffs = e2.getValue();
-                if (diffs.isEmpty()) continue;
+                if (diffs.isEmpty())
+                {
+                    if (diffs.flushed instanceof FinalList)
+                    {
+                        Builder builder = new Builder(e2.getKey(), ALL);
+                        Cleanup cleanup;
+                        try { cleanup = builder.shouldCleanup(FULL, 
store.unsafeGetRedundantBefore(), store.durableBefore()); }
+                        catch (LogUnavailableException ignore) { continue; }
+
+                        Invariants.require(cleanup.compareTo(((FinalList) 
diffs.flushed).cleanup()) >= 0);
+                    }
+                    continue;
+                }
 
                 Diffs subset = diffs;
                 {
@@ -562,7 +571,7 @@ public class InMemoryJournal implements Journal
                 ++counter;
 
                 Cleanup cleanup;
-                try {cleanup = builder.shouldCleanup(input, 
store.unsafeGetRedundantBefore(), store.durableBefore()); }
+                try { cleanup = builder.shouldCleanup(input, 
store.unsafeGetRedundantBefore(), store.durableBefore()); }
                 catch (LogUnavailableException ignore) {cleanup = ERASE; }
 
                 cleanup = builder.maybeCleanup(true, cleanup);
@@ -682,7 +691,7 @@ public class InMemoryJournal implements Journal
 
     private static abstract class FinalList extends AbstractList<Diff>
     {
-
+        abstract Cleanup cleanup();
     }
 
     private static class ErasedList extends FinalList
@@ -728,10 +737,18 @@ public class InMemoryJournal implements Journal
             }
             return super.set(index, diff);
         }
+
+        @Override
+        Cleanup cleanup()
+        {
+            return ERASE;
+        }
     }
 
     private static class PurgedList extends FinalList
     {
+        PurgedList() {}
+
         @Override
         public Diff get(int index)
         {
@@ -752,6 +769,12 @@ public class InMemoryJournal implements Journal
                 return false;
             throw illegalState();
         }
+
+        @Override
+        Cleanup cleanup()
+        {
+            return EXPUNGE;
+        }
     }
 
     private static Diff toDiff(@Nonnull CommandUpdate update)
diff --git 
a/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java 
b/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java
new file mode 100644
index 00000000..b0ea9bd3
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/MaybeExecuteAdapterTest.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import accord.api.Agent;
+import accord.api.ProgressLog.NoOpProgressLog;
+import accord.api.RoutingKey;
+import accord.api.Scheduler;
+import accord.coordinate.CoordinationAdapter;
+import accord.impl.DefaultLocalListeners;
+import accord.impl.DefaultRemoteListeners;
+import accord.impl.DefaultTimeouts;
+import accord.impl.InMemoryCommandStore;
+import accord.impl.InMemoryCommandStores;
+import accord.impl.IntKey;
+import accord.impl.SizeOfIntersectionSorter;
+import accord.impl.TestAgent;
+import accord.impl.TopologyFactory;
+import accord.impl.basic.InMemoryJournal;
+import accord.impl.mock.MockCluster;
+import accord.impl.mock.MockStore;
+import accord.impl.mock.MockTopologyService;
+import accord.local.Node.Id;
+import accord.local.UniqueTimeService.AtomicUniqueTime;
+import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullKeyRoute;
+import accord.primitives.Keys;
+import accord.primitives.Range;
+import accord.primitives.RangeDeps;
+import accord.primitives.Ranges;
+import accord.primitives.RoutingKeys;
+import accord.primitives.SaveStatus;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topology;
+import accord.utils.DefaultRandom;
+import accord.utils.ImmutableBitSet;
+import accord.utils.LargeBitSet;
+import accord.utils.RandomSource;
+import accord.utils.async.AsyncChainUtils;
+
+import static accord.Utils.id;
+import static accord.Utils.writeTxn;
+import static accord.primitives.Status.Durability.NotDurable;
+
+/**
+ * Locks down the {@link Commands.MaybeExecuteAdapter} contract:
+ *
+ * <em>Every invocation of
+ * {@link Commands#maybeExecute(SafeCommandStore, SafeCommand, Command, 
boolean, boolean, Commands.MaybeExecuteAdapter)}
+ * must invoke exactly one of {@code adapter.notifyWaiting} or {@code 
adapter.notWaiting} before
+ * returning.</em>
+ *
+ * <p>Some adapters (notably {@code 
NotifyWaitingOnPlus.adapter(continuation,...)} used by
+ * {@link CommandStore#tryToExecuteListeningTxns(boolean)}) chain a 
continuation off of
+ * {@code notWaiting}; a return path that fires neither callback silently 
abandons the
+ * continuation and can deadlock the {@code AsyncResult} returned by
+ * {@code tryToExecuteListeningTxns}.  That bug used to manifest on startup of 
nodes with a large
+ * backlog of registered listeners whose target txns were {@code 
Stable}/{@code PreApplied} with
+ * only outstanding *key* waits (no command waits), because
+ * {@code waitingOn.isWaitingOnCommand()} was the only condition under which 
the adapter was
+ * contacted on that return path.
+ *
+ * <p>This test intentionally bypasses the
+ * {@link SafeCommand#update(SafeCommandStore, Command, boolean)} machinery 
(and the
+ * {@code CommandsForKey}, progress log, etc. side effects it triggers) by 
installing the
+ * synthetic command directly into the underlying {@link 
InMemoryCommandStore.GlobalCommand}.
+ * That keeps the test scoped to the branching inside {@code 
Commands.maybeExecute}.
+ */
+public class MaybeExecuteAdapterTest
+{
+    // Topology / route / key plumbing.
+    private static final Id ID1 = id(1);
+    private static final Id ID2 = id(2);
+    private static final Id ID3 = id(3);
+    private static final List<Id> IDS = Lists.newArrayList(ID1, ID2, ID3);
+    private static final Range FULL_RANGE = IntKey.range(0, 100);
+    private static final Ranges FULL_RANGES = Ranges.single(FULL_RANGE);
+    private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 
3, FULL_RANGE);
+    private static final IntKey.Raw KEY = IntKey.key(10);
+    private static final RoutingKey HOME_KEY = KEY.toUnseekable();
+    private static final FullKeyRoute ROUTE = 
RoutingKeys.of(HOME_KEY).toRoute(HOME_KEY);
+    private static final Keys KEYS = Keys.of(KEY);
+
+    /** Counts adapter callbacks, ignoring their inputs.  We are only 
verifying the contract. */
+    private static class Recorder implements Commands.MaybeExecuteAdapter
+    {
+        int notifyWaiting;
+        int notWaiting;
+
+        @Override public void notifyWaiting(SafeCommandStore safeStore, 
SafeCommand safeCommand) { ++notifyWaiting; }
+        @Override public void notWaiting(SafeCommandStore safeStore)           
                   { ++notWaiting;    }
+    }
+
+
+    private static class CommandStoreSupport
+    {
+        final AtomicReference<Topology> local = new 
AtomicReference<>(TOPOLOGY);
+        final MockStore data = new MockStore();
+    }
+
+    private static InMemoryCommandStore.Synchronized 
createStore(CommandStoreSupport support)
+    {
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        Agent agent = new TestAgent(clock);
+        RandomSource random = new DefaultRandom();
+        Node node = new Node(ID1, null, new MockTopologyService(ignore -> 
null, support.local.get()),
+                             clock, new AtomicUniqueTime(clock),
+                             () -> support.data,
+                             new ShardDistributor.EvenSplit(8, ignore -> new 
IntKey.Splitter()),
+                             agent, random.fork(), 
Scheduler.NEVER_RUN_SCHEDULED,
+                             SizeOfIntersectionSorter.SUPPLIER, 
DefaultRemoteListeners::new,
+                             time -> new DefaultTimeouts(time, Runnable::run),
+                             ignore -> ignore2 -> new NoOpProgressLog(),
+                             DefaultLocalListeners.Factory::new,
+                             InMemoryCommandStores.Synchronized::new,
+                             new CoordinationAdapter.DefaultFactory(),
+                             DurableBefore.NOOP_PERSISTER,
+                             new InMemoryJournal(ID1, random.fork()));
+        AsyncChainUtils.awaitUninterruptibly(node.unsafeStart().chain());
+        return (InMemoryCommandStore.Synchronized) node.unsafeByIndex(0);
+    }
+
+    /**
+     * Build a {@code Stable} {@link Command} with the supplied {@link 
Command.WaitingOn}.  The
+     * {@code waitingOn} can be any shape (key-only, command-only, mixed, 
empty) regardless of
+     * whether it is semantically consistent with the rest of the command: 
{@link Command#validate}
+     * only requires {@code waitingOn != null} for stable/committed statuses, 
and we are testing
+     * pure {@code maybeExecute} branching here.
+     */
+    private static Command buildStable(TxnId txnId, Command.WaitingOn 
waitingOn)
+    {
+        Txn txn = writeTxn(KEYS);
+        return new CommandBuilder(txnId)
+                       .durability(NotDurable)
+                       .participants(StoreParticipants.all(ROUTE))
+                       .promised(Ballot.ZERO)
+                       .acceptedOrCommitted(Ballot.ZERO)
+                       .partialTxn(txn.slice(FULL_RANGES, true))
+                       .partialDeps(Deps.NONE.intersecting(ROUTE))
+                       .executeAt(txnId)
+                       .waitingOn(waitingOn)
+                       .build(SaveStatus.Stable);
+    }
+
+    private static Command buildPreAccepted(TxnId txnId)
+    {
+        Txn txn = writeTxn(KEYS);
+        return new CommandBuilder(txnId)
+                       .durability(NotDurable)
+                       .participants(StoreParticipants.all(ROUTE))
+                       .promised(Ballot.ZERO)
+                       .acceptedOrCommitted(Ballot.ZERO)
+                       .partialTxn(txn.slice(FULL_RANGES, true))
+                       .executeAt(txnId)
+                       .build(SaveStatus.PreAccepted);
+    }
+
+    private static Command.WaitingOn waitingOnKeysOnly(int numKeyBits)
+    {
+        // 0 txn waits + N key waits => bitset of size numKeyBits; all bits 
live in the key region
+        // (because txnIdCount() = directRangeDeps.txnIdCount() = 0).  This is 
the data shape that
+        // tripped the old maybeExecute key-only branch.
+        RoutingKey[] keys = new RoutingKey[numKeyBits];
+        for (int i = 0; i < numKeyBits; ++i)
+            keys[i] = IntKey.routing(20 + i);
+        RoutingKeys waitKeys = RoutingKeys.of(keys);
+        LargeBitSet bits = new LargeBitSet(waitKeys.size());
+        bits.setRange(0, waitKeys.size());
+        return new Command.WaitingOn(waitKeys, RangeDeps.NONE,
+                                     new ImmutableBitSet(bits),
+                                     new ImmutableBitSet(0));
+    }
+
+    private static Command.WaitingOn waitingOnCommandsOnly(TxnId... depTxnIds)
+    {
+        Range[] ranges = new Range[]{ IntKey.range(0, 5) };
+        // SerializerSupport's create() takes a flat int[] of [range0_end, 
range1_end, ..., dep0_idx, dep1_idx, ...]
+        // (per the existing AccordCommandStoreTryExecuteListeningTest helper).
+        int[] rangesToTxnIds = new int[ranges.length + depTxnIds.length];
+        rangesToTxnIds[0] = rangesToTxnIds.length; // a single range covers 
all dep ids
+        for (int i = 0; i < depTxnIds.length; ++i)
+            rangesToTxnIds[ranges.length + i] = i;
+        RangeDeps directRangeDeps = RangeDeps.SerializerSupport.create(ranges, 
depTxnIds, rangesToTxnIds, null);
+
+        // bitset size = directRangeDeps.txnIdCount() + 0 keys = 
depTxnIds.length; set all command bits
+        LargeBitSet bits = new LargeBitSet(depTxnIds.length);
+        bits.setRange(0, depTxnIds.length);
+        return new Command.WaitingOn(RoutingKeys.EMPTY, directRangeDeps,
+                                     new ImmutableBitSet(bits),
+                                     new ImmutableBitSet(0));
+    }
+
+    private static Command.WaitingOn waitingOnMixed(TxnId depTxnId, 
RoutingKey... keys)
+    {
+        Range[] ranges = new Range[]{ IntKey.range(0, 5) };
+        TxnId[] depTxnIds = new TxnId[]{ depTxnId };
+        int[] rangesToTxnIds = new int[]{ 2, 0 };
+        RangeDeps directRangeDeps = RangeDeps.SerializerSupport.create(ranges, 
depTxnIds, rangesToTxnIds, null);
+
+        RoutingKeys waitKeys = RoutingKeys.of(keys);
+        int size = directRangeDeps.txnIdCount() + waitKeys.size();
+        LargeBitSet bits = new LargeBitSet(size);
+        bits.setRange(0, size);
+        return new Command.WaitingOn(waitKeys, directRangeDeps,
+                                     new ImmutableBitSet(bits),
+                                     new ImmutableBitSet(0));
+    }
+
+    private static TxnId nextTxnId(MockCluster.Clock clock, int n)
+    {
+        return clock.idForNode(1, n);
+    }
+
+    /** Directly inject {@code command} into the store as the current state of 
{@code txnId}. */
+    private static void installCommand(InMemoryCommandStore.Synchronized 
commands, TxnId txnId, Command command)
+    {
+        commands.command(txnId).value(command);
+    }
+
+    /**
+     * Open a SafeCommandStore against {@code txnId} and invoke
+     * {@link Commands#maybeExecute(SafeCommandStore, SafeCommand, Command, 
boolean, boolean, Commands.MaybeExecuteAdapter)}
+     * with the supplied command and recorder.
+     */
+    private static void runMaybeExecute(InMemoryCommandStore.Synchronized 
commands,
+                                        TxnId txnId,
+                                        Command command,
+                                        boolean alwaysNotifyListeners,
+                                        boolean notifyWaitingOn,
+                                        Recorder rec)
+    {
+        commands.execute(() -> {
+            SafeCommandStore safeStore = 
commands.beginOperation(PreLoadContext.contextFor(txnId, "Test"), null);
+            try
+            {
+                SafeCommand safeCommand = safeStore.unsafeGet(txnId);
+                Commands.maybeExecute(safeStore, safeCommand, command, 
alwaysNotifyListeners, notifyWaitingOn, rec);
+            }
+            finally
+            {
+                commands.completeOperation(safeStore);
+            }
+        });
+    }
+
+    // -------- The tests --------
+
+    /**
+     * Sanity: confirm the helpers produce {@code WaitingOn} instances with 
the bit-shape we
+     * intend to drive {@code maybeExecute} into.  If this ever stops holding 
(e.g. because of an
+     * encoding change to {@code WaitingOn}), the dependent tests below would 
silently fail to
+     * exercise the buggy branch.
+     */
+    @Test
+    void sanityHelpersProduceExpectedShapes()
+    {
+        Command.WaitingOn keyOnly = waitingOnKeysOnly(2);
+        Assertions.assertTrue(keyOnly.isWaiting(),         "keyOnly should be 
waiting");
+        Assertions.assertFalse(keyOnly.isWaitingOnCommand(),"keyOnly should 
NOT be waiting on a command");
+        Assertions.assertTrue(keyOnly.isWaitingOnKey(),    "keyOnly should be 
waiting on a key");
+
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId dep = nextTxnId(clock, 2);
+        Command.WaitingOn cmdOnly = waitingOnCommandsOnly(dep);
+        Assertions.assertTrue(cmdOnly.isWaiting());
+        Assertions.assertTrue(cmdOnly.isWaitingOnCommand());
+        Assertions.assertFalse(cmdOnly.isWaitingOnKey());
+
+        Command.WaitingOn mixed = waitingOnMixed(dep, IntKey.routing(20));
+        Assertions.assertTrue(mixed.isWaiting());
+        Assertions.assertTrue(mixed.isWaitingOnCommand());
+        Assertions.assertTrue(mixed.isWaitingOnKey());
+    }
+
+    /**
+     * Regression test for the deadlock: a {@code Stable} target with only key 
waits used to fire
+     * neither adapter callback, silently dropping any continuation chained 
off of {@code notWaiting}.
+     */
+    @Test
+    void notWaitingFiresForStableWithOnlyKeyWaits()
+    {
+        InMemoryCommandStore.Synchronized commands = createStore(new 
CommandStoreSupport());
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId txnId = nextTxnId(clock, 1);
+
+        Command stable = buildStable(txnId, waitingOnKeysOnly(1));
+        installCommand(commands, txnId, stable);
+
+        Recorder rec = new Recorder();
+        runMaybeExecute(commands, txnId, stable, false, true, rec);
+
+        Assertions.assertEquals(0, rec.notifyWaiting,
+                                "notifyWaiting must NOT be called when 
waitingOn has no command bits");
+        Assertions.assertEquals(1, rec.notWaiting,
+                                "notWaiting must be called exactly once when 
waitingOn has no command bits");
+    }
+
+    /** {@code Stable} with only command waits should report via {@code 
notifyWaiting}. */
+    @Test
+    void notifyWaitingFiresForStableWithOnlyCommandWaits()
+    {
+        InMemoryCommandStore.Synchronized commands = createStore(new 
CommandStoreSupport());
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId txnId = nextTxnId(clock, 1);
+        TxnId depTxnId = nextTxnId(clock, 2);
+
+        Command stable = buildStable(txnId, waitingOnCommandsOnly(depTxnId));
+        installCommand(commands, txnId, stable);
+
+        Recorder rec = new Recorder();
+        runMaybeExecute(commands, txnId, stable, false, true, rec);
+
+        Assertions.assertEquals(1, rec.notifyWaiting,
+                                "notifyWaiting must be called exactly once 
when waitingOn still has command bits");
+        Assertions.assertEquals(0, rec.notWaiting,
+                                "notWaiting must NOT be called when we are 
still waiting on a command");
+    }
+
+    /** {@code Stable} with both command and key waits is still "waiting on a 
command" overall. */
+    @Test
+    void notifyWaitingFiresForStableWithMixedWaits()
+    {
+        InMemoryCommandStore.Synchronized commands = createStore(new 
CommandStoreSupport());
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId txnId = nextTxnId(clock, 1);
+        TxnId depTxnId = nextTxnId(clock, 2);
+
+        Command stable = buildStable(txnId, waitingOnMixed(depTxnId, 
IntKey.routing(20)));
+        installCommand(commands, txnId, stable);
+
+        Recorder rec = new Recorder();
+        runMaybeExecute(commands, txnId, stable, false, true, rec);
+
+        Assertions.assertEquals(1, rec.notifyWaiting);
+        Assertions.assertEquals(0, rec.notWaiting);
+    }
+
+    /**
+     * When the caller asks for "do not propagate to dependencies" ({@code 
notifyWaitingOn = false}),
+     * we still owe the adapter a {@code notWaiting} signal on the waiting 
path; otherwise any
+     * adapter that chains a continuation off of {@code notWaiting} would 
silently lose it.
+     */
+    @Test
+    void notWaitingFiresWhenNotifyWaitingOnIsFalse()
+    {
+        InMemoryCommandStore.Synchronized commands = createStore(new 
CommandStoreSupport());
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId txnId = nextTxnId(clock, 1);
+        TxnId depTxnId = nextTxnId(clock, 2);
+
+        // notifyWaitingOn = false suppresses notifyWaiting; the symmetric 
notWaiting still must fire.
+        Command stable = buildStable(txnId, waitingOnCommandsOnly(depTxnId));
+        installCommand(commands, txnId, stable);
+
+        Recorder rec = new Recorder();
+        runMaybeExecute(commands, txnId, stable, false, false, rec);
+
+        Assertions.assertEquals(0, rec.notifyWaiting);
+        Assertions.assertEquals(1, rec.notWaiting);
+    }
+
+    /**
+     * A non-{@code Stable}/{@code PreApplied} target hits the early-return 
branch; that branch
+     * must also fire {@code notWaiting} (existing behaviour; protect against 
regression).
+     */
+    @Test
+    void notWaitingFiresForNonExecutingSaveStatus()
+    {
+        InMemoryCommandStore.Synchronized commands = createStore(new 
CommandStoreSupport());
+        MockCluster.Clock clock = new MockCluster.Clock(100);
+        TxnId txnId = nextTxnId(clock, 1);
+
+        Command preaccepted = buildPreAccepted(txnId);
+        installCommand(commands, txnId, preaccepted);
+
+        Recorder rec = new Recorder();
+        runMaybeExecute(commands, txnId, preaccepted, false, true, rec);
+
+        Assertions.assertEquals(0, rec.notifyWaiting);
+        Assertions.assertEquals(1, rec.notWaiting,
+                                "notWaiting must fire from the 
non-Stable/PreApplied early-return branch");
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index b48128e5..5836a44b 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -59,6 +59,7 @@ import accord.local.CommandBuilder;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores;
+import accord.local.CommandStores.RangesForEpoch;
 import accord.local.Node;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
@@ -898,7 +899,7 @@ public class CommandsForKeyTest
         }
 
         @Override
-        public CommandStores.RangesForEpoch ranges()
+        public RangesForEpoch ranges()
         {
             throw new UnsupportedOperationException();
         }
@@ -980,7 +981,7 @@ public class CommandsForKeyTest
                   null,
                   ignore -> new ProgressLog.NoOpProgressLog(),
                   ignore -> new DefaultLocalListeners(null, new 
DefaultRemoteListeners((a, b, c, d, e)->{}), DefaultNotifySink.INSTANCE),
-                  new EpochUpdateHolder());
+                  RangesForEpoch.EMPTY);
             this.pruneInterval = pruneInterval;
             this.pruneHlcDelta = pruneHlcDelta;
             this.maxConflictsHlcDelta = maxConflictsHlcDelta;
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java 
b/accord-core/src/test/java/accord/utils/AccordGens.java
index d4040a74..81cc93c1 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -151,9 +151,10 @@ public class AccordGens
     public static Gen<TxnId> txnIds(Gen.LongGen epochs, Gen.LongGen hlcs, 
Gen.IntGen nodes, Gen<Txn.Kind> kinds, Gen<Domain> domains, Gen<Cardinality> 
cardinalities)
     {
         return rs -> {
-            Domain domain = domains.next(rs);
+            Txn.Kind kind = kinds.next(rs);
+            Domain domain = kind.isSyncPoint() ? Domain.Range : 
domains.next(rs);
             Cardinality cardinality = domain == Domain.Range ? Any : 
cardinalities.next(rs);
-            return new TxnId(epochs.nextLong(rs), hlcs.nextLong(rs), 0, 
kinds.next(rs), domain, cardinality, new Node.Id(nodes.nextInt(rs)));
+            return new TxnId(epochs.nextLong(rs), hlcs.nextLong(rs), 0, kind, 
domain, cardinality, new Node.Id(nodes.nextInt(rs)));
         };
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to