This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch fixes-260226
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 7d15d7fd629384da88f5eaf5cbd7435d6fa56ad5
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Dec 22 14:08:16 2025 +0000

    safe shutdown
    snapshot/restore listeners and progressLog
---
 .../src/main/java/accord/api/AsyncExecutor.java    |   2 +
 accord-core/src/main/java/accord/api/Journal.java  |   3 +-
 .../src/main/java/accord/api/LocalListeners.java   |  17 +++
 .../main/java/accord/api/ProtocolModifiers.java    |   5 +-
 .../main/java/accord/impl/AbstractReplayer.java    |  75 ++++++++++--
 .../java/accord/impl/DefaultLocalListeners.java    |  41 +++++++
 .../java/accord/impl/InMemoryCommandStore.java     |  61 ++++++----
 .../java/accord/impl/progresslog/BaseTxnState.java |  35 ++++--
 .../impl/progresslog/DefaultProgressLog.java       |  40 ++++++-
 .../java/accord/impl/progresslog/HomeState.java    |   9 +-
 .../java/accord/impl/progresslog/TxnState.java     |  32 +++++
 .../java/accord/impl/progresslog/WaitingState.java |  29 ++++-
 .../src/main/java/accord/local/Catchup.java        |  15 ++-
 .../src/main/java/accord/local/CommandStore.java   |  42 +++----
 .../src/main/java/accord/local/CommandStores.java  |  21 +++-
 .../src/main/java/accord/local/Commands.java       |   4 +-
 .../src/main/java/accord/local/DurableBefore.java  |  11 +-
 .../src/main/java/accord/local/MaxConflicts.java   |   4 +-
 .../src/main/java/accord/local/MaxDecidedRX.java   |  29 ++++-
 accord-core/src/main/java/accord/local/Node.java   |  12 +-
 .../main/java/accord/local/RedundantBefore.java    |  35 ++++--
 .../main/java/accord/local/RedundantStatus.java    |   4 +-
 .../src/main/java/accord/local/RejectBefore.java   |   4 +-
 .../main/java/accord/local/SafeCommandStore.java   |  31 +++--
 .../main/java/accord/messages/SetShardDurable.java |   1 +
 .../java/accord/primitives/AbstractRanges.java     |   2 +-
 .../src/main/java/accord/primitives/Timestamp.java |   2 -
 .../src/main/java/accord/primitives/TxnId.java     |   1 +
 .../src/main/java/accord/topology/Topology.java    |   2 +-
 .../main/java/accord/utils/PersistentField.java    | 132 ++++++++++++++++-----
 .../main/java/accord/utils/ReducingRangeMap.java   |  41 ++++++-
 .../java/accord/utils/async/AsyncCallbacks.java    |   1 +
 .../src/test/java/accord/impl/basic/Cluster.java   |   2 +-
 .../accord/impl/basic/DelayedCommandStores.java    |   4 +-
 .../java/accord/impl/basic/InMemoryJournal.java    |   7 +-
 .../java/accord/impl/basic/LoggingJournal.java     |   9 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   3 +-
 37 files changed, 588 insertions(+), 180 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/AsyncExecutor.java 
b/accord-core/src/main/java/accord/api/AsyncExecutor.java
index 731a2fef..f303f141 100644
--- a/accord-core/src/main/java/accord/api/AsyncExecutor.java
+++ b/accord-core/src/main/java/accord/api/AsyncExecutor.java
@@ -26,6 +26,7 @@ import accord.utils.async.AsyncCallbacks.RunOrFail;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.Cancellable;
 
+// TODO (required): consistent RejectedExecutionException handling
 public interface AsyncExecutor extends Executor
 {
     // unlike execute, throws no exceptions, nor will not wrap the runnable
@@ -38,6 +39,7 @@ public interface AsyncExecutor extends Executor
 
     // Depending on this implementation this method may queue-jump, i.e. task 
submission order is not guaranteed.
     // Make sure this is semantically safe at all call-sites.
+    // TODO (required): RejectedExecutionException?
     default void executeMaybeImmediately(Runnable run)
     {
         if (!tryExecuteImmediately(run))
diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 381f1be2..d9533576 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -53,6 +53,7 @@ public interface Journal
         MINIMAL_WITH_DEPS
     }
 
+    void open(Node node);
     void start(Node node);
 
     Command loadCommand(int store, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
@@ -70,7 +71,7 @@ public interface Journal
      * Replays all messages from journal to rehydrate CommandStores state. 
Returns whether it has seen (and ignored)
      * any exceptions during replay.
      */
-    boolean replay(CommandStores commandStores);
+    boolean replay(CommandStores commandStores, Object param);
 
     RedundantBefore loadRedundantBefore(int store);
     NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
diff --git a/accord-core/src/main/java/accord/api/LocalListeners.java 
b/accord-core/src/main/java/accord/api/LocalListeners.java
index caa51ec7..38cf4967 100644
--- a/accord-core/src/main/java/accord/api/LocalListeners.java
+++ b/accord-core/src/main/java/accord/api/LocalListeners.java
@@ -105,6 +105,23 @@ public interface LocalListeners
             this.waitingOn = waitingOn;
             this.awaitingStatus = awaitingStatus;
         }
+
+        @Override
+        public boolean equals(Object that)
+        {
+            return that instanceof TxnListener && equals((TxnListener) that);
+        }
+
+        public boolean equals(TxnListener that)
+        {
+            return this.waiter.equals(that.waiter) && 
this.waitingOn.equals(that.waitingOn) && this.awaitingStatus == 
that.awaitingStatus;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     Iterable<TxnListener> txnListeners();
diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java 
b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
index 447f4209..03f6fb15 100644
--- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java
+++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java
@@ -235,8 +235,7 @@ public class ProtocolModifiers
         public static boolean recoveryAwaitsSupersedingSyncPoints() { return 
recoveryAwaitsSupersedingSyncPoints; }
         public static void setRecoveryAwaitsSupersedingSyncPoints(boolean 
newRecoveryAwaitsSupersedingSyncPoints) { recoveryAwaitsSupersedingSyncPoints = 
newRecoveryAwaitsSupersedingSyncPoints; }
 
-        // TODO (required): default this to false once released support via 
recoveryAwaitsSupersedingSyncPoints
-        private static boolean syncPointsTrackUnstableMediumPathDependencies = 
true;
+        private static boolean syncPointsTrackUnstableMediumPathDependencies = 
false;
         public static boolean syncPointsTrackUnstableMediumPathDependencies() 
{ return syncPointsTrackUnstableMediumPathDependencies; }
         public static void 
setSyncPointsTrackUnstableMediumPathDependencies(boolean 
newSyncPointsTrackUnstableMediumPathDependencies) { 
syncPointsTrackUnstableMediumPathDependencies = 
newSyncPointsTrackUnstableMediumPathDependencies; }
 
@@ -248,8 +247,6 @@ public class ProtocolModifiers
         public static boolean filterDuplicateDependenciesFromAcceptReply() { 
return filterDuplicateDependenciesFromAcceptReply; }
         public static void 
setFilterDuplicateDependenciesFromAcceptReply(boolean 
newFilterDuplicateDependenciesFromAcceptReply) { 
filterDuplicateDependenciesFromAcceptReply = 
newFilterDuplicateDependenciesFromAcceptReply; }
 
-
-
         public enum SendStableMessages { TO_ALL, FOR_READS, 
FOR_READS_OR_NONE_IF_FASTEXEC}
         private static SendStableMessages sendStableMessages = 
FOR_READS_OR_NONE_IF_FASTEXEC;
         public static void setSendStableMessages(SendStableMessages 
newSendStableMessages) { sendStableMessages = newSendStableMessages; }
diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java 
b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
index 8c5c0f90..5e97681e 100644
--- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java
+++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java
@@ -34,7 +34,12 @@ import accord.primitives.Participants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
+import accord.utils.UnhandledEnum;
 
+import static accord.impl.AbstractReplayer.Replay.NONE;
+import static accord.impl.AbstractReplayer.Replay.TO_BOTH;
+import static accord.impl.AbstractReplayer.Replay.TO_COMMAND_STORE;
+import static accord.impl.AbstractReplayer.Replay.TO_DATA_STORE;
 import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_COMMAND_STORE;
 import static 
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE;
 import static accord.primitives.SaveStatus.Applying;
@@ -45,14 +50,41 @@ import static accord.primitives.Txn.Kind.Write;
 
 public abstract class AbstractReplayer implements Journal.Replayer
 {
+    // TODO (required): NON_DURABLE does not properly account for things like 
pre-bootstrap
+    public enum Mode { ALL, PART_NON_DURABLE, NON_DURABLE }
+    public enum Replay
+    {
+        // warning: behaviour depends on bit pattern of ordinals, so change 
with care
+        NONE, TO_COMMAND_STORE, TO_DATA_STORE, TO_BOTH;
+
+        private static final Replay[] lookup = values();
+
+        public boolean includes(Replay replay)
+        {
+            return (ordinal() & replay.ordinal()) == replay.ordinal();
+        }
+        public Replay atLeast(Replay that)
+        {
+            return lookup[ordinal() | that.ordinal()];
+        }
+        public Replay atMost(Replay that)
+        {
+            return lookup[ordinal() & that.ordinal()];
+        }
+    }
+
     public final RedundantBefore redundantBefore;
+    public final Mode mode;
     public final TxnId minReplay;
 
-    protected AbstractReplayer(CommandStore commandStore, @Nullable TxnId 
minReplay)
+    protected AbstractReplayer(CommandStore commandStore, Mode mode, @Nullable 
TxnId minReplay)
     {
         this.redundantBefore = commandStore.unsafeGetRedundantBefore();
+        this.mode = mode;
         
Invariants.require(redundantBefore.ranges(Objects::nonNull).containsAll(commandStore.unsafeGetRangesForEpoch().all()));
-        this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) -> 
TxnId.nonNullOrMin(v, b.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE, 
LOCALLY_DURABLE_TO_COMMAND_STORE)), minReplay, ignore -> false));
+        if (mode != Mode.ALL)
+            minReplay = redundantBefore.foldl((b, v) -> TxnId.nonNullOrMin(v, 
replayBound(b)), minReplay, ignore -> false);
+        this.minReplay = TxnId.noneIfNull(minReplay);
     }
 
     protected boolean maybeShouldReplay(TxnId txnId)
@@ -60,25 +92,47 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
         return txnId.compareTo(minReplay) >= 0;
     }
 
-    protected boolean shouldReplay(TxnId txnId, StoreParticipants participants)
+    protected Replay shouldReplay(TxnId txnId, StoreParticipants participants)
     {
         Participants<?> search = participants.route();
         if (search == null) search = participants.hasTouched();
-        return redundantBefore.foldl(search, (b, v, id) -> v || 
b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, 
LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, false, txnId, i -> i);
+        switch (mode)
+        {
+            default: throw new UnhandledEnum(mode);
+            case ALL: return TO_BOTH;
+            case NON_DURABLE: return redundantBefore.foldl(search, (b, v, id) 
-> v.atMost(replay(b, id)), TO_BOTH, txnId, i -> false);
+            case PART_NON_DURABLE: return redundantBefore.foldl(search, (b, v, 
id) -> v.atLeast(replay(b, id)), NONE, txnId, i -> false);
+        }
+    }
+
+    private static TxnId replayBound(RedundantBefore.Bounds bounds)
+    {
+        return bounds.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, 
LOCALLY_DURABLE_TO_DATA_STORE);
     }
 
-    protected void initialiseState(SafeCommandStore safeStore, TxnId txnId)
+    private static Replay replay(RedundantBefore.Bounds bounds, TxnId txnId)
+    {
+        Replay replay = NONE;
+        if (bounds.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE).compareTo(txnId) 
<= 0)
+            replay = TO_COMMAND_STORE;
+        if (bounds.maxBound(LOCALLY_DURABLE_TO_DATA_STORE).compareTo(txnId) <= 
0)
+            replay = replay.atLeast(TO_DATA_STORE);
+        return replay;
+    }
+
+    protected void replay(SafeCommandStore safeStore, TxnId txnId, Replay 
replay)
     {
         SafeCommand safeCommand = safeStore.unsafeGet(txnId);
         {
             Command command = safeCommand.current();
             if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && 
command.saveStatus().compareTo(PreApplied) <= 0)
             {
-                Commands.maybeExecute(safeStore, safeCommand, command, false, 
true);
+                if (replay.includes(TO_COMMAND_STORE))
+                    Commands.maybeExecute(safeStore, safeCommand, command, 
false, true);
             }
             else if (command.saveStatus().compareTo(Applying) >= 0 && 
command.saveStatus().compareTo(TruncatedApplyWithOutcome) <= 0)
             {
-                if (command.txnId().is(Write))
+                if (command.txnId().is(Write) && 
replay.includes(TO_DATA_STORE))
                 {
                     Commands.applyChain(safeStore, command)
                             .begin(safeStore.agent());
@@ -86,7 +140,10 @@ public abstract class AbstractReplayer implements 
Journal.Replayer
                 else Invariants.expect(command.hasBeen(Applied), "%s is 
Applying but is not a Write transaction", txnId);
             }
         }
-        safeCommand.update(safeStore, safeCommand.current(), true);
-        safeStore.notifyListeners(safeCommand, null);
+        if (replay.includes(Replay.TO_COMMAND_STORE))
+        {
+            safeCommand.update(safeStore, safeCommand.current(), true);
+            safeStore.notifyListeners(safeCommand, null);
+        }
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 0e6fa3bf..d83a5502 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -18,10 +18,12 @@
 
 package accord.impl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
@@ -44,6 +46,8 @@ import accord.utils.AsymmetricComparator;
 import accord.utils.Invariants;
 import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
 
 import static accord.utils.ArrayBuffers.cachedAny;
 import static accord.utils.ArrayBuffers.cachedTxnIds;
@@ -737,4 +741,41 @@ public class DefaultLocalListeners implements 
LocalListeners
             };
         };
     }
+
+    public void restore(List<TxnListener> listeners)
+    {
+        if (listeners.isEmpty())
+            return;
+
+        if (!BTree.isEmpty(txnListeners))
+            throw new IllegalStateException("Restore only supported if 
uninitialised");
+
+        listeners.sort((a, b) -> {
+            int c = a.waitingOn.compareTo(b.waitingOn);
+            if (c == 0) c = a.awaitingStatus.compareTo(b.awaitingStatus);
+            if (c == 0) c = a.waiter.compareTo(b.waiter);
+            return c;
+        });
+
+        List<TxnListeners> build = new ArrayList<>();
+        int li = 0;
+        while (li < listeners.size())
+        {
+            TxnListener l = listeners.get(li);
+            TxnListeners ls = new TxnListeners(l.waitingOn, l.awaitingStatus);
+            build.add(ls);
+            ls.add(l.waiter);
+            while (++li < listeners.size() && (l = 
listeners.get(li)).waitingOn.equals(ls) && l.awaitingStatus == ls.await)
+                ls.add(l.waiter);
+        }
+        txnListeners = BTree.build(BulkIterator.of(build.iterator()), 
build.size(), UpdateFunction.noOp());
+    }
+
+    public List<TxnListener> snapshot()
+    {
+        List<TxnListener> snapshot = new ArrayList<>(BTree.size(txnListeners));
+        for (TxnListener listener : txnListeners())
+            snapshot.add(listener);
+        return snapshot;
+    }
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 8c80cb85..f0eb6611 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -41,6 +41,7 @@ import java.util.function.Predicate;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.primitives.*;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,11 +50,14 @@ import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.Journal;
 import accord.api.LocalListeners;
+import accord.api.LocalListeners.TxnListener;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
+import accord.impl.cfr.IdEntry;
 import accord.impl.cfr.InMemoryRangeSummaryIndex;
 import accord.impl.cfr.LoadListener;
 import accord.impl.progresslog.DefaultProgressLog;
+import accord.impl.progresslog.TxnState;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
@@ -74,18 +78,8 @@ import accord.local.StoreParticipants;
 import accord.local.cfk.CommandsForKey;
 import accord.local.cfk.SafeCommandsForKey;
 import accord.local.cfk.Serialize;
-import accord.primitives.AbstractUnseekableKeys;
-import accord.primitives.PartialDeps;
-import accord.primitives.Participants;
-import accord.primitives.Ranges;
-import accord.primitives.RoutableKey;
-import accord.primitives.Route;
-import accord.primitives.Status;
 import accord.primitives.Status.Durability.HasOutcome;
-import accord.primitives.Timestamp;
 import accord.primitives.Txn.Kind.Kinds;
-import accord.primitives.TxnId;
-import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
@@ -106,6 +100,7 @@ import static accord.primitives.Routable.Domain.Key;
 import static accord.primitives.Routable.Domain.Range;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.SaveStatus.Applying;
+import static accord.primitives.SaveStatus.Invalidated;
 import static accord.primitives.SaveStatus.ReadyToExecute;
 import static accord.primitives.Status.Applied;
 import static accord.primitives.Status.Committed;
@@ -127,6 +122,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
         private final long id = nextId.incrementAndGet();
         private final NavigableMap<RoutingKey, ByteBuffer> commandsForKey = 
new TreeMap<>();
+        private List<IdEntry> commandsForRanges;
+        private List<TxnListener> listeners;
+        private List<TxnState> progressLog;
         private int waitingForCfk;
 
         private Snapshot(){}
@@ -136,6 +134,10 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             for (Map.Entry<RoutingKey, ByteBuffer> e : 
commandsForKey.entrySet())
                 commandStore.commandsForKey.computeIfAbsent(e.getKey(), 
GlobalCommandsForKey::new).value(Serialize.fromBytes(e.getKey(), e.getValue()));
 
+            commandStore.progressLog.clear();
+            ((DefaultProgressLog)commandStore.progressLog).restore(null, 
progressLog);
+            ((DefaultLocalListeners)commandStore.listeners).restore(listeners);
+            commandStore.commandsForRanges.restore(commandsForRanges);
             commandStore.commandsForRanges.prune(commandStore);
         }
 
@@ -162,6 +164,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         {
             Snapshot snapshot = new Snapshot();
 
+            snapshot.listeners = 
((DefaultLocalListeners)commandStore.listeners).snapshot();
+            snapshot.progressLog = 
((DefaultProgressLog)commandStore.progressLog).snapshot();
+            snapshot.commandsForRanges = 
commandStore.commandsForRanges.snapshot();
             for (Map.Entry<RoutableKey, GlobalCommandsForKey> e : 
commandStore.commandsForKey.entrySet())
             {
                 GlobalCommandsForKey global = e.getValue();
@@ -366,7 +371,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     }
 
     @Override
-    protected void updatedRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore added)
+    protected void upsertedRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore added)
     {
         InMemorySafeStore inMemorySafeStore = (InMemorySafeStore) safeStore;
         for (int i = 0 ; i < added.size() ; ++i)
@@ -390,7 +395,8 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             for (TxnId txnId : clearing)
             {
                 GlobalCommand globalCommand = commands.get(txnId);
-                Invariants.require(globalCommand != null && 
!globalCommand.isEmpty());
+                if (globalCommand == null)
+                    continue; // now we restore contents from snapshot, we 
might repopulate older items
                 Command command = globalCommand.value();
                 StoreParticipants participants = 
command.participants().filter(LOAD, safeStore, txnId, 
command.executeAtIfKnown());
                 Cleanup cleanup = Cleanup.shouldCleanup(FULL, txnId, 
command.executeAtIfKnown(), command.saveStatus(), command.durability(), 
participants, unsafeGetRedundantBefore(), durableBefore());
@@ -401,18 +407,24 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                                           || 
!Route.isFullRoute(command.route()))));
             }
         }
-        super.updatedRedundantBefore(safeStore, added);
+        super.upsertedRedundantBefore(safeStore, added);
     }
 
     @Override
     public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, 
Ranges ranges, HasOutcome level)
     {
         super.markShardDurable(safeStore, syncId, ranges, level);
-        // TODO (required): this should happen on markLocallyApplied
         if (level == Universal)
             commandsForRanges.prune(syncId, ranges, 
safeStore.redundantBefore());
     }
 
+    @Override
+    protected void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId syncId, Ranges ranges, SaveStatus prevStatus)
+    {
+        super.markExclusiveSyncPointLocallyApplied(safeStore, syncId, ranges, 
prevStatus);
+        commandsForRanges.prune(syncId, ranges, safeStore.redundantBefore());
+    }
+
     protected InMemorySafeStore createSafeStore(PreLoadContext context, 
CommandsForRangeLoad cfrLoad,
                                                 Map<TxnId, 
InMemorySafeCommand> commands,
                                                 Map<RoutableKey, 
InMemorySafeCommandsForKey> commandsForKeys)
@@ -925,8 +937,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             Invariants.require(loader.loadKeysFor() == RECOVERY);
             Command command = commands.get(txnId).value();
             Summary summary = loader.ifRelevant(command);
-            Invariants.require(summary != null);
-            loaded.put(summary.plainTxnId(), summary);
+            // TODO (expected): prune implied invalidations from index, so no 
need to special case
+            if (summary == null) Invariants.require(command.saveStatus() == 
Invalidated);
+            else loaded.put(summary.plainTxnId(), summary);
         });
         return new CommandsForRangeLoad(loader, loaded, 
commandsForRanges.registerListener(new LoadListener(loader, loaded)));
     }
@@ -1217,7 +1230,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         commandsForKey.clear();
         commandsForRanges.clear();
         progressLog.clear();
-        unsafeSetRejectBefore(new RejectBefore());
+        unsafeSetRejectBefore(RejectBefore.EMPTY);
         hasResumedBootstraps = false;
     }
 
@@ -1233,17 +1246,17 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         private CommandReplayer(InMemoryCommandStore commandStore)
         {
             // TODO (required): we shouldn't be providing TxnId.NONE here, we 
need to standardise on querying journal for data missing from 
InMemoryCommandStore
-            super(commandStore, TxnId.NONE);
+            super(commandStore, Mode.PART_NON_DURABLE, TxnId.NONE);
             this.commandStore = commandStore;
         }
 
-        private AsyncChain<Void> apply(Command command)
+        private AsyncChain<Void> apply(Command command, Replay replay)
         {
             return 
AsyncChains.success(commandStore.executeInContext(commandStore,
                                                                      
PreLoadContext.contextFor(command.txnId(), "Replay"),
                                                                      null,
                                                                      
(SafeCommandStore safeStore) -> {
-                                                                         
initialiseState(safeStore, command.txnId());
+                                                                         
super.replay(safeStore, command.txnId(), replay);
                                                                          
return null;
                                                                      }));
         }
@@ -1276,10 +1289,14 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 }
             }
 
-            if (command == null || !maybeShouldReplay(txnId) || 
!shouldReplay(txnId, command.participants()))
+            if (command == null || !maybeShouldReplay(txnId))
+                return AsyncChains.success(null);
+
+            Replay replay = shouldReplay(txnId, command.participants());
+            if (replay == Replay.NONE)
                 return AsyncChains.success(null);
 
-            return apply(command);
+            return apply(command, replay);
         }
     }
 }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java 
b/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java
index 57d6d803..e98d6f86 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java
@@ -46,7 +46,10 @@ import static accord.impl.progresslog.TxnStateKind.Waiting;
  */
 abstract class BaseTxnState extends LogGroupTimers.Timer implements 
Comparable<BaseTxnState>
 {
-    private static final int CONTACT_ALL_SHIFT = 63; // TODO (desired): have 
separate contact all flags for recovery and waiting states
+    private static final int RESTORED_SHIFT = 63;
+    static final long RESTORED_BIT = 1L << RESTORED_SHIFT;
+    private static final int CONTACT_ALL_SHIFT = RESTORED_SHIFT - 1;
+    private static final long CONTACT_ALL_BIT = 1L << CONTACT_ALL_SHIFT; // 
TODO (desired): have separate contact all flags for recovery and waiting states
     private static final int SCHEDULED_TIMER_SHIFT = CONTACT_ALL_SHIFT - 1;
     private static final int INVALID_IF_UNCOMMITTED_SHIFT = 
SCHEDULED_TIMER_SHIFT - 1;
     private static final int PENDING_TIMER_BITS = 9;
@@ -59,22 +62,25 @@ abstract class BaseTxnState extends LogGroupTimers.Timer 
implements Comparable<B
     public final TxnId txnId;
 
     /**
-     * bits [0..43) encode WaitingState
+     * bits [0..8) encode HomeState
      * 2 bits for Progress
-     * 3 bits for BlockedUntil target
-     * 3 bits for BlockedUntil that home shard can satisfy
-     * 32 bits for remote progress key counter [note: if we need to in future 
we can safely and easily reclaim bits here]
+     * 3 bits for CoordinatePhase
      * 3 bits for retry counter
      * <p>
-     * bits [43..51) encode HomeState
+     * bits [8..51) encode WaitingState
      * 2 bits for Progress
-     * 3 bits for CoordinatePhase
+     * 2 bits for BlockedUntil target
+     * 2 bits for BlockedUntil querying (<= target)
+     * 2 bits for BlockedUntil that home shard can satisfy
+     * 1 bit to query non-home shards (as home shard has been truncated)
+     * 24+5 bits for remote key progress tracking [note: if we need to in 
future we can safely and easily reclaim bits here]
      * 3 bits for retry counter
      * <p>
-     * bits [52..61) for pending timer delay
-     * bit 61 indicates if this transaction can be inferred invalid if a later 
quorum finds it not committed on any shard
-     * bit 62 for which kind of timer is scheduled
-     * bit 63 for whether we should contact all candidate replicas (rather 
than just our preferred group)
+     * bits [51..60) for pending timer delay
+     * bit 60 indicates if this transaction can be inferred invalid if a later 
quorum finds it not committed on any shard
+     * bit 61 for which kind of timer is scheduled
+     * bit 62 for whether we should contact all candidate replicas (rather 
than just our preferred group)
+     * bit 63 for whether the item was restored from a snapshot (so can modify 
certain invariants)
      */
     long encodedState;
 
@@ -89,9 +95,14 @@ abstract class BaseTxnState extends LogGroupTimers.Timer 
implements Comparable<B
         return this.txnId.compareTo(that.txnId);
     }
 
+    boolean isRestored()
+    {
+        return (encodedState & RESTORED_BIT) != 0L;
+    }
+
     boolean contactEveryone()
     {
-        return ((encodedState >>> CONTACT_ALL_SHIFT) & 1L) == 1L;
+        return (encodedState & CONTACT_ALL_BIT) != 0L;
     }
 
     void setContactEveryone(boolean newContactEveryone)
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 33aa3021..fa0f9466 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -21,6 +21,7 @@ package accord.impl.progresslog;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +56,8 @@ import accord.utils.LogGroupTimers;
 import accord.utils.TinyEnumSet;
 import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
+import accord.utils.btree.BulkIterator;
+import accord.utils.btree.UpdateFunction;
 import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.Object2ObjectHashMap;
 
@@ -462,7 +465,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         if (command == null) command = uninitialised(blockedBy.txnId());
 
         SaveStatus saveStatus = command.saveStatus();
-        Invariants.require(saveStatus.compareTo(blockedUntil.unblockedFrom) < 
0);
+        Invariants.expect(saveStatus.compareTo(blockedUntil.unblockedFrom) < 
0);
 
         StoreParticipants blockedOnStoreParticipants2 = null;
         if (blockedOnParticipants != null || blockedOnRoute != null)
@@ -883,7 +886,6 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         }
     }
 
-    @VisibleForImplementation
     public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId 
txnId)
     {
         clearPendingAndActive(kind, txnId);
@@ -1072,7 +1074,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
 
         public boolean isWaitingUninitialised()
         {
-            return current.isUninitialised();
+            return current.isWaitingUninitialised();
         }
 
         @Nonnull
@@ -1122,4 +1124,36 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
             return current.homeRunCounter();
         }
     }
+
+    public void restore(SafeCommandStore safeStore, List<TxnState> states)
+    {
+        if (!BTree.isEmpty(stateMap))
+            throw new IllegalStateException("Restore only supported if 
uninitialised");
+
+        {
+            List<TxnState> snapshot = new ArrayList<>(states.size());
+            for (TxnState state : states)
+                snapshot.add(state.snapshot());
+            states = snapshot;
+        }
+
+        states.sort(Comparator.naturalOrder());
+        stateMap = BTree.build(BulkIterator.of(states.iterator()), 
states.size(), UpdateFunction.noOp());
+
+        for (TxnState state : states)
+        {
+            if (!state.isHomeDoneOrUninitialised() && state.homeProgress() != 
NoneExpected)
+                state.updateScheduling(safeStore, this, Home, null, Queued);
+            if (!state.isWaitingDoneOrUninitialised() && 
state.waitingProgress() != NoneExpected)
+                state.updateScheduling(safeStore, this, Waiting, null, Queued);
+        }
+    }
+
+    public List<TxnState> snapshot()
+    {
+        List<TxnState> snapshot = new ArrayList<>(BTree.size(stateMap));
+        for (TxnState state : BTree.<TxnState>iterable(stateMap))
+            snapshot.add(state.snapshot());
+        return snapshot;
+    }
 }
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 54dee971..8262dce5 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -67,6 +67,7 @@ abstract class HomeState extends BaseTxnState
     private static final long SET_MASK = ~((PROGRESS_MASK << PROGRESS_SHIFT)
                                            | (STATUS_MASK << STATUS_SHIFT));
     static final int HOME_STATE_END_SHIFT = RUN_COUNTER_SHIFT + 3;
+    static final long SNAPSHOT_HOME_MASK = ~SET_MASK;
 
     static
     {
@@ -80,13 +81,13 @@ abstract class HomeState extends BaseTxnState
 
     void set(SafeCommandStore safeStore, DefaultProgressLog owner, HomePhase 
newHomePhase, Progress newProgress)
     {
-        setNoScheduling(newHomePhase, newProgress);
+        setWithoutScheduling(newHomePhase, newProgress);
         if (newProgress == NoneExpected)
             owner.clearProgressToken(txnId);
         updateScheduling(safeStore, owner, Home, null, newProgress);
     }
 
-    void setNoScheduling(HomePhase newHomePhase, Progress newProgress)
+    void setWithoutScheduling(HomePhase newHomePhase, Progress newProgress)
     {
         encodedState &= SET_MASK;
         encodedState |= ((long) newHomePhase.ordinal() << STATUS_SHIFT)
@@ -156,7 +157,7 @@ abstract class HomeState extends BaseTxnState
 
     HomePhase shouldUpdatePhase(DefaultProgressLog owner, Command command)
     {
-        if (command.saveStatus() == SaveStatus.Erased)
+        if (command.durability().isDurableOrInvalidated() || 
command.saveStatus() == SaveStatus.Erased)
             return Done;
 
         HomePhase phase = homePhase();
@@ -197,7 +198,7 @@ abstract class HomeState extends BaseTxnState
                     set(safeStore, owner, updatePhase, NoneExpected);
                     return;
                 }
-                setNoScheduling(updatePhase, Queued);
+                setWithoutScheduling(updatePhase, Queued);
             }
         }
 
diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java 
b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
index 1292061e..800b53d3 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
@@ -34,6 +34,16 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public final class TxnState extends WaitingState implements PreLoadContext
 {
+    public static class SerializationSupport
+    {
+        public static TxnState create(TxnId txnId, long encoded)
+        {
+            TxnState result = new TxnState(txnId);
+            result.encodedState = encoded;
+            return result;
+        }
+    }
+
     TxnState(TxnId txnId)
     {
         super(txnId);
@@ -185,4 +195,26 @@ public final class TxnState extends WaitingState 
implements PreLoadContext
     {
         return "Progress";
     }
+
+    public TxnState snapshot()
+    {
+        TxnState copy = new TxnState(txnId);
+        copy.encodedState = (encodedState & (SNAPSHOT_HOME_MASK | 
SNAPSHOT_WAITING_MASK)) | RESTORED_BIT;
+        return copy;
+    }
+
+    public long encodedState()
+    {
+        return encodedState;
+    }
+
+    public boolean equals(Object that)
+    {
+        return that instanceof TxnState && equals((TxnState) that);
+    }
+
+    public boolean equals(TxnState that)
+    {
+        return this.txnId.equals(that.txnId) && this.encodedState == 
that.encodedState;
+    }
 }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index d6c9065c..8dfc765c 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -104,7 +104,7 @@ abstract class WaitingState extends HomeState
     private static final int AWAIT_STARTED_SHIFT = QUERY_SHARDS_NOT_HOME_SHIFT 
+ 1;
     private static final int AWAIT_STARTED_BIT = 1 << AWAIT_STARTED_SHIFT;
     private static final int AWAIT_SHIFT = AWAIT_STARTED_SHIFT + 1;
-    private static final int AWAIT_BITS = 26;
+    private static final int AWAIT_BITS = 24;
     private static final long AWAIT_MASK = (1L << AWAIT_BITS) - 1;
     private static final int AWAIT_EPOCH_SHIFT = AWAIT_SHIFT + AWAIT_BITS;
     private static final int AWAIT_EPOCH_BITS = 4;
@@ -114,6 +114,8 @@ abstract class WaitingState extends HomeState
     private static final int RETRY_COUNTER_SHIFT = AWAIT_EPOCH_SHIFT + 
AWAIT_EPOCH_BITS;
     private static final long RETRY_COUNTER_MASK = 0x7;
     static final int WAITING_STATE_END_SHIFT = RETRY_COUNTER_SHIFT + 3;
+    static long SNAPSHOT_WAITING_MASK = INITIALISED_MASK | ~SET_MASK | 
QUERY_SHARDS_NOT_HOME_BIT;
+
     static
     {
         Invariants.require(BLOCKED_UNTIL_SHIFT == PROGRESS_SHIFT + 
Long.bitCount(PROGRESS_MASK));
@@ -153,7 +155,7 @@ abstract class WaitingState extends HomeState
         encodedState |= (long) homeStatus.ordinal() << HOME_SATISFIES_SHIFT;
     }
 
-    boolean isUninitialised()
+    boolean isWaitingUninitialised()
     {
         return 0 == (encodedState & INITIALISED_MASK);
     }
@@ -450,13 +452,16 @@ abstract class WaitingState extends HomeState
         incrementWaitingRunCounter();
         BlockedUntil blockedUntil = blockedUntil();
         Command command = safeCommand.current();
-        if (command.saveStatus().compareTo(SaveStatus.Erased) >= 0 // TODO 
(expected): improve progress log clearing to guarantee we don't encounter this 
status
-            || 
!Invariants.expect(command.saveStatus().compareTo(blockedUntil.unblockedFrom) < 
0,
-                           "Command has met desired criteria (%s) but progress 
log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command))
+        if (command.saveStatus().compareTo(blockedUntil.unblockedFrom) >= 0)
         {
+            // TODO (expected): improve progress log clearing to guarantee we 
don't encounter Erased
+            Invariants.expect(command.saveStatus() == SaveStatus.Erased || 
isRestored(),
+                              "Command has met desired criteria (%s) but 
progress log entry has not been cancelled: %s",
+                              blockedUntil.unblockedFrom, command);
             setWaitingDone(owner);
             return;
         }
+
         TxnId txnId = safeCommand.txnId();
         // first make sure we have enough information to obtain the command 
locally
         Timestamp executeAt = command.executeAtIfKnown();
@@ -832,6 +837,13 @@ abstract class WaitingState extends HomeState
                 return;
             }
 
+            if (safeCommand != null && 
safeCommand.current().saveStatus().compareTo(querying.unblockedFrom) >= 0)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async callback 
%d with %s; local command already exceeds wait status");
+                return;
+            }
+
             callbackId >>= 1;
             Invariants.nonNull(safeCommand);
             Route<?> route = 
Route.castToRoute(safeCommand.current().maxParticipants());
@@ -924,7 +936,7 @@ abstract class WaitingState extends HomeState
 
     void awaitSlice(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId 
txnId, Timestamp executeAt, Route<?> route, int callbackId, @Nullable Tracing 
tracing)
     {
-        Invariants.require(blockedUntil.waitsOn == SHARD);
+        Invariants.require(blockedUntil.waitsOn == SHARD || 
queryShardsNotHome());
         // TODO (expected): special-case when this shard is home key to avoid 
remote messages
         await(owner, blockedUntil, txnId, executeAt, route, callbackId, 
WaitingState::synchronousAwaitSliceCallback, tracing);
     }
@@ -982,6 +994,11 @@ abstract class WaitingState extends HomeState
         return waitingProgress() == NoneExpected && blockedUntil() == CanApply;
     }
 
+    boolean isWaitingDoneOrUninitialised()
+    {
+        return isWaitingDone() || isWaitingUninitialised();
+    }
+
     enum CallbackKind
     {
         Fetch, FetchRoute, AwaitHome, AwaitSlice
diff --git a/accord-core/src/main/java/accord/local/Catchup.java 
b/accord-core/src/main/java/accord/local/Catchup.java
index ea2b7611..658321d7 100644
--- a/accord-core/src/main/java/accord/local/Catchup.java
+++ b/accord-core/src/main/java/accord/local/Catchup.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +32,7 @@ import accord.coordinate.FetchDurableBefore;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.SaveStatus;
+import accord.primitives.Status;
 import accord.primitives.TxnId;
 import accord.utils.Reduce;
 import accord.utils.async.AsyncChain;
@@ -58,7 +60,8 @@ public class Catchup
 
         boolean register(SafeCommandStore safeStore)
         {
-            waitingOn = 
safeStore.ranges().all().slice(durableBefore.ranges(Objects::nonNull), Minimal);
+            waitingOn = 
safeStore.ranges().all().slice(durableBefore.ranges(Objects::nonNull), 
Minimal).mergeTouching();
+            logger.debug("{}: Registering listener on {}, filtering by {}", 
safeStore.commandStore(), waitingOn, safeStore.redundantBefore().map(b -> b == 
null ? null : b.maxBound(LOCALLY_APPLIED), TxnId[]::new));
             updateWaitingOn(safeStore);
 
             if (!waitingOn.isEmpty())
@@ -94,8 +97,14 @@ public class Catchup
             //noinspection DataFlowIssue
             safeStore = safeStore;
             PreLoadContext ctx = PreLoadContext.contextFor(txnId, "Catchup");
-            if (safeStore.canExecuteWith(ctx)) 
safeStore.progressLog().waiting(CanApply, safeStore, safeStore.get(txnId), 
null, Ranges.of(range), null);
-            else safeStore.commandStore().execute(ctx, safeStore0 -> 
safeStore0.progressLog().waiting(CanApply, safeStore0, safeStore0.get(txnId), 
null, Ranges.of(range), null));
+            if (safeStore.canExecuteWith(ctx)) markWaiting(safeStore, 
safeStore.get(txnId), range);
+            else safeStore.commandStore().execute(ctx, (Consumer<? super 
SafeCommandStore>) safeStore0 -> markWaiting(safeStore0, safeStore0.get(txnId), 
range), safeStore.agent());
+        }
+
+        private static void markWaiting(SafeCommandStore safeStore, 
SafeCommand safeCommand, Range range)
+        {
+            if (!safeCommand.current().hasBeen(Status.PreApplied))
+                safeStore.progressLog().waiting(CanApply, safeStore, 
safeCommand, null, Ranges.of(range), null);
         }
 
         private void done(SafeCommandStore safeStore)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 532f1ec8..ac9e7d98 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -35,6 +35,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
+import accord.primitives.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
@@ -55,14 +56,7 @@ import accord.local.Commands.NotifyWaitingOnPlus;
 import accord.local.PreLoadContext.Empty;
 import accord.local.RedundantBefore.Bounds;
 import accord.local.RedundantStatus.SomeStatus;
-import accord.primitives.Ranges;
-import accord.primitives.Routables;
-import accord.primitives.SaveStatus;
-import accord.primitives.Status;
 import accord.primitives.Status.Durability.HasOutcome;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import accord.primitives.Unseekables;
 import accord.utils.DeterministicIdentitySet;
 import accord.utils.Invariants;
 import accord.utils.Reduce;
@@ -190,7 +184,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
      */
     private NavigableMap<Timestamp, Ranges> safeToRead = emptySafeToRead();
     private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new 
DeterministicIdentitySet<>());
-    @Nullable private RejectBefore rejectBefore;
+    private RejectBefore rejectBefore = RejectBefore.EMPTY;
 
     static class WaitingOnVisibility
     {
@@ -228,8 +222,6 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         return id;
     }
 
-    public void restore() {};
-
     public abstract Journal.Replayer replayer();
     // expected to invoke safeStore.upsertRedundantBefore at some future 
point, when the commandStore state is durably persisted
     protected abstract void ensureDurable(Ranges ranges, RedundantBefore 
onCommandStoreDurable);
@@ -441,7 +433,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
     /**
      * To be overridden by implementations, to ensure the new state is 
persisted.
      */
-    protected void setMaxConflicts(MaxConflicts maxConflicts)
+    protected void unsafeSetMaxConflicts(MaxConflicts maxConflicts)
     {
         this.maxConflicts = maxConflicts;
     }
@@ -512,15 +504,14 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
 
             maxConflictsUpdates = 0;
         }
-        setMaxConflicts(updatedMaxConflicts);
+        unsafeSetMaxConflicts(updatedMaxConflicts);
     }
 
-    final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, 
Ranges ranges)
+    final void upsertRejectBefore(SafeCommandStore safeStore, TxnId txnId, 
Ranges ranges)
     {
         // TODO (desired): narrow ranges to those that are owned
         Invariants.requireArgument(txnId.isSyncPoint());
-        RejectBefore newRejectBefore = rejectBefore != null ? rejectBefore : 
new RejectBefore();
-        newRejectBefore = RejectBefore.add(newRejectBefore, ranges, txnId);
+        RejectBefore newRejectBefore = RejectBefore.add(rejectBefore, ranges, 
txnId);
         unsafeSetRejectBefore(newRejectBefore);
     }
 
@@ -529,7 +520,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         unsafeSetMaxDecidedRX(maxDecidedRX.update(ranges, txnId));
     }
 
-    final void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges, SaveStatus prevStatus)
+    protected void markExclusiveSyncPointLocallyApplied(SafeCommandStore 
safeStore, TxnId txnId, Ranges ranges, SaveStatus prevStatus)
     {
         // TODO (desired): narrow ranges to those that are owned
         if (prevStatus.compareTo(SaveStatus.Applied) < 0)
@@ -748,7 +739,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
             // If rebootstrap can grab a later timestamp for subsequent 
attempts, but this timestamp is enough for us
             // to establish which transactions, for which ranges the node can 
safely participate in).
             TxnId unreadyBefore = bootstrap.start(safeStore);
-            
safeStore.unsafeUpsertRedundantBefore(RedundantBefore.create(ranges, 
unreadyBefore, LOG_UNAVAILABLE_ONLY));
+            safeStore.upsertRedundantBefore(RedundantBefore.create(ranges, 
unreadyBefore, LOG_UNAVAILABLE_ONLY));
             updateMaxConflicts(ranges, unreadyBefore);
             // TODO (desired): we could start accepting non-dep requests here
             bootstrap.data.invoke((SettableByCallback<Void>)ready.data);
@@ -756,7 +747,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
             ready.coordinate.invokeIfSuccess(() -> {
                 execute((Empty)() -> "Accept Dependency Requests", safeStore0 
-> {
                     unsafeAcceptRequests(remaining);
-                });
+                }, agent);
             });
             return null;
         })).begin(agent);
@@ -811,7 +802,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
                         logger.info("Failed to close epoch {} for ranges {} on 
store {}, but some are retired; marking these as synced.", epoch, ranges, id, 
fail);
                         execute((Empty)() -> "Mark Retired Ranges Synced", 
safeStore -> {
                             markVisibleInternal(safeStore, epoch, retired, 
"(Retired)");
-                        });
+                        }, agent);
                     }
                     else if (remaining.isEmpty())
                     {
@@ -819,7 +810,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
                     }
                     if (!remaining.isEmpty())
                     {
-                        logger.error("Failed to close epoch {} for ranges {} 
on store {}. Retrying.", epoch, remaining, id, fail);
+                        logger.warn("Failed to close epoch {} for ranges {} on 
store {}. Retrying.", epoch, remaining, id, fail);
                         node.someExecutor().execute(() -> 
ensureReadyToCoordinate(epoch, remaining));
                     }
                 }
@@ -891,7 +882,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         }
     }
 
-    protected void updatedRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore added)
+    protected void upsertedRedundantBefore(SafeCommandStore safeStore, 
RedundantBefore added)
     {
         TxnId clearWaitingBefore = 
redundantBefore.minShardAndLocallyAppliedBefore();
         TxnId clearAllBefore = TxnId.min(clearWaitingBefore, 
durableBefore().min.quorumBefore);
@@ -1076,7 +1067,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
         SettableResult<Void> done = new SettableResult<>();
         execute((Empty)() -> "Try Execute Listening", safeStore -> {
             tryExecuteListening(safeStore, 
listeners.txnsWaitingOn(SaveStatus.Applied).iterator(), done);
-        });
+        }, agent);
         return done;
     }
 
@@ -1096,7 +1087,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
             {
                 //noinspection DataFlowIssue
                 safeStore = safeStore;
-                execute(context, safeStore0 -> tryExecuteListening(safeStore0, 
waitingOn, iterator, done));
+                execute(context, safeStore0 -> { 
tryExecuteListening(safeStore0, waitingOn, iterator, done); }, agent);
             }
             else
             {
@@ -1140,9 +1131,6 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
 
     public final boolean isRejectedIfNotPreAccepted(TxnId txnId, 
Unseekables<?> participants)
     {
-        if (rejectBefore == null)
-            return false;
-
         return rejectBefore.rejects(txnId, participants);
     }
 
@@ -1267,7 +1255,7 @@ public abstract class CommandStore implements 
AbstractAsyncExecutor, SequentialA
     {
         Timestamp timestamp = 
Timestamp.fromValues(rangesForEpoch.epochs[rangesForEpoch.epochs.length - 1], 
minHlc, 0, node.id());
         MaxConflicts updated = maxConflicts.update(rangesForEpoch.all(), 
timestamp);
-        setMaxConflicts(updated);
+        unsafeSetMaxConflicts(updated);
     }
 
     public static NavigableMap<TxnId, Ranges> emptyBootstrapBeganAt()
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 9a6e2d29..29bc2867 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -567,6 +567,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
 
     protected void loadSnapshot(Snapshot toLoad)
     {
+        Invariants.require(!shuttingDown);
         current = toLoad;
     }
 
@@ -657,6 +658,7 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
     final ShardDistributor shardDistributor;
     final Journal journal;
     volatile Snapshot current;
+    boolean shuttingDown;
     int nextId;
 
     private CommandStores(StoreSupplier supplier, ShardDistributor 
shardDistributor, Journal journal)
@@ -962,10 +964,13 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         for (Map.Entry<Integer, RangesForEpoch> e : 
update.commandStores.entrySet())
         {
             Invariants.require(e.getValue() != null);
-            EpochUpdateHolder holder = new EpochUpdateHolder();
-            holder.add(1, e.getValue(), e.getValue().all());
-            shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), 
e.getValue());
+            EpochUpdateHolder epochUpdates = new EpochUpdateHolder();
+            ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), 
epochUpdates), e.getValue());
+            // TODO (required): if the add is necessary (highly unlikely) it 
needs to be done once journal is writeable so we NEED to move this
+            if (!shard.ranges.equals(shard.store.rangesForEpoch))
+                epochUpdates.add(1, e.getValue(), e.getValue().all());
             maxId = Math.max(maxId, e.getKey());
+            shards[i++] = shard;
         }
         Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id));
 
@@ -975,7 +980,6 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
 
     public synchronized void resetTopology(Journal.TopologyUpdate update)
     {
-        // TODO: assert
         Snapshot current = this.current;
         Invariants.require(update.global.epoch() == current.local.epoch());
         ShardHolder[] shards = new ShardHolder[current.commandStores.size()];
@@ -1025,6 +1029,9 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
 
     public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology)
     {
+        if (shuttingDown)
+            throw new IllegalStateException("CommandStores are shutting down");
+
         TopologyUpdate update = updateTopology(node, current, newTopology);
         if (update.snapshot != current)
         {
@@ -1044,8 +1051,14 @@ public abstract class CommandStores implements 
AsyncExecutorFactory
         return update.bootstrap;
     }
 
+    protected synchronized void markShuttingDown()
+    {
+        shuttingDown = true;
+    }
+
     public void shutdown()
     {
+        markShuttingDown();
         for (ShardHolder shard : current.shards)
             shard.store.shutdown();
     }
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 9e234578..e7be67e6 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -1128,10 +1128,8 @@ public class Commands
         return false;
     }
 
-    static int counter = 0;
     public static boolean maybeCleanup(SafeCommandStore safeStore, SafeCommand 
safeCommand, Command command, @Nonnull StoreParticipants newParticipants)
     {
-        ++counter;
         StoreParticipants cleanupParticipants = newParticipants.filter(LOAD, 
safeStore, command.txnId(), command.executeAtIfKnown());
         Cleanup cleanup = shouldCleanup(FULL, safeStore, command, 
cleanupParticipants);
         if (cleanup == NO)
@@ -1224,7 +1222,7 @@ public class Commands
             }
             else
             {
-                safeStore.commandStore().execute(this, this);
+                safeStore.commandStore().execute(this, this, 
safeStore.agent());
             }
         }
 
diff --git a/accord-core/src/main/java/accord/local/DurableBefore.java 
b/accord-core/src/main/java/accord/local/DurableBefore.java
index 9e4368d9..e7856416 100644
--- a/accord-core/src/main/java/accord/local/DurableBefore.java
+++ b/accord-core/src/main/java/accord/local/DurableBefore.java
@@ -45,10 +45,7 @@ public class DurableBefore extends 
ReducingRangeMap<DurableBefore.Entry>
         public static DurableBefore create(RoutingKey[] ends, Entry[] values)
         {
             if (values.length == 0)
-            {
-                Invariants.require(ends.length == 1 && ends[0] == null);
                 return DurableBefore.EMPTY;
-            }
             return new DurableBefore(ends, values);
         }
     }
@@ -171,6 +168,14 @@ public class DurableBefore extends 
ReducingRangeMap<DurableBefore.Entry>
         return ReducingIntervalMap.merge(a, b, DurableBefore.Entry::max, 
Builder::new);
     }
 
+    public static DurableBefore mergeIfDifferent(DurableBefore prev, 
DurableBefore add)
+    {
+        DurableBefore next = DurableBefore.merge(prev, add);
+        if (next.equals(prev))
+            return prev;
+        return next.equals(prev) ? prev : next;
+    }
+
     public HasOutcome min(TxnId txnId, Unseekables<?> unseekables)
     {
         return notDurableIfNull(foldlWithDefault(unseekables, Entry::mergeMin, 
Entry.NONE, null, txnId, test -> test == None));
diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java 
b/accord-core/src/main/java/accord/local/MaxConflicts.java
index 74ef5bb5..21dd9e05 100644
--- a/accord-core/src/main/java/accord/local/MaxConflicts.java
+++ b/accord-core/src/main/java/accord/local/MaxConflicts.java
@@ -51,9 +51,9 @@ public class MaxConflicts extends 
BTreeReducingRangeMap<Timestamp>
         return update(this, keysOrRanges, maxConflict, Timestamp::mergeMax, 
MaxConflicts::new, Builder::new);
     }
 
-    private static class Builder extends AbstractBoundariesBuilder<RoutingKey, 
Timestamp, MaxConflicts>
+    public static class Builder extends AbstractBoundariesBuilder<RoutingKey, 
Timestamp, MaxConflicts>
     {
-        protected Builder(int capacity)
+        public Builder(int capacity)
         {
             super(capacity);
         }
diff --git a/accord-core/src/main/java/accord/local/MaxDecidedRX.java 
b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
index 1ff475c0..50bd3f3c 100644
--- a/accord-core/src/main/java/accord/local/MaxDecidedRX.java
+++ b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
@@ -49,6 +49,8 @@ public class MaxDecidedRX extends 
ReducingRangeMap<MaxDecidedRX.DecidedRX>
 
         public DecidedRX(TxnId any, TxnId hlcBound)
         {
+            Invariants.nonNull(any);
+            Invariants.nonNull(hlcBound);
             this.any = any;
             this.hlcBound = hlcBound;
         }
@@ -56,7 +58,7 @@ public class MaxDecidedRX extends 
ReducingRangeMap<MaxDecidedRX.DecidedRX>
         @Override
         public String toString()
         {
-            return "{any=" + any + ",hlcBound=" + hlcBound + "}";
+            return "{any=" + any + ",hlcBound=" + hlcBound + '}';
         }
 
         public boolean includeDecided(TxnId txnId)
@@ -117,6 +119,31 @@ public class MaxDecidedRX extends 
ReducingRangeMap<MaxDecidedRX.DecidedRX>
                 return b;
             return new DecidedRX(any, hlcBound);
         }
+
+        @Override
+        public boolean equals(Object that)
+        {
+            return that instanceof DecidedRX && equals((DecidedRX) that);
+        }
+
+        public boolean equals(DecidedRX that)
+        {
+            return this.any.equals(that.any) && 
this.hlcBound.equals(that.hlcBound);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static final class SerializerSupport
+    {
+        public static MaxDecidedRX create(RoutingKey[] starts, DecidedRX[] 
values)
+        {
+            return new MaxDecidedRX(starts, values);
+        }
     }
 
     public static final MaxDecidedRX EMPTY = new MaxDecidedRX();
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 9c4f1620..6b843698 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -205,14 +205,10 @@ public class Node implements NodeCommandStoreService
         this.agent = agent;
         this.random = random;
         this.persistDurableBefore = new PersistentField<>(() -> durableBefore,
-                                                          (input, prev) -> {
-                                                              DurableBefore 
next = DurableBefore.merge(input, prev);
-                                                              if 
(next.equals(prev))
-                                                                  return prev;
-                                                              return 
next.equals(prev) ? prev : next;
-                                                          },
+                                                          
DurableBefore::merge, DurableBefore::mergeIfDifferent,
                                                           
safeDurableBeforePersister(durableBeforePersister),
-                                                          
this::setPersistedDurableBefore);
+                                                          
this::setPersistedDurableBefore,
+                                                          run -> 
someExecutor().execute(run));
         this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), journal, shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
         this.topology = new TopologyManager(topologySorter, this, 
topologyService, time, timeouts);
         this.durabilityService = new DurabilityService(this);
@@ -319,7 +315,7 @@ public class Node implements NodeCommandStoreService
 
     public AsyncResult<?> markDurable(DurableBefore addDurableBefore)
     {
-        return withEpochExact(addDurableBefore.maxEpoch(), 
(AsyncExecutor)null, () -> 
persistDurableBefore.mergeAndUpdate(addDurableBefore).chain())
+        return withEpochExact(addDurableBefore.maxEpoch(), 
(AsyncExecutor)null, () -> persistDurableBefore.save(addDurableBefore).chain())
                .beginAsResult();
     }
 
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 9ea4239e..76920d13 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -639,7 +639,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
             return notRetired.without(Ranges.of(bounds.range));
         }
 
-        RedundantStatus get(TxnId txnId, @Nullable Timestamp applyAtIfKnown)
+        public RedundantStatus get(TxnId txnId, @Nullable Timestamp 
applyAtIfKnown)
         {
             if (wasOwned(txnId))
             {
@@ -1277,17 +1277,26 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
     @Override
     public String toString()
     {
-        return "gc:" + toString(GC_BEFORE)
-               + "\nlocal:" + toString(LOCALLY_DURABLE_TO_DATA_STORE, 
LOCALLY_DURABLE_TO_COMMAND_STORE)
-               + "\nbootstrap:" + toString(UNREADY);
+        return toString(", ");
     }
 
-    private String toString(Property p1)
+    public String toString(String delimiter)
     {
-        return toString(p1, null);
+        StringBuilder sb = new StringBuilder();
+        append(sb, delimiter, "gc:", GC_BEFORE);
+        append(sb, delimiter, "applied:", LOCALLY_APPLIED);
+        append(sb, delimiter, "command_store:", 
LOCALLY_DURABLE_TO_COMMAND_STORE);
+        append(sb, delimiter, "data_store:", LOCALLY_DURABLE_TO_DATA_STORE);
+        append(sb, delimiter, "unready:", UNREADY);
+        return sb.toString();
     }
 
-    private String toString(Property p1, Property p2)
+    private void append(StringBuilder builder, String delimiter, String 
prefix, Property p1)
+    {
+        append(builder, delimiter, prefix, p1, null);
+    }
+
+    private void append(StringBuilder builder, String delimiter, String 
prefix, Property p1, Property p2)
     {
         TreeMap<TxnId, List<Range>> map = new TreeMap<>();
         foldl((e, m, pp1, pp2) -> {
@@ -1297,8 +1306,14 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
             return m;
         }, map, p1, p2, i -> false);
 
-        return map.descendingMap().entrySet().stream()
-                  .map(e -> (e.getKey().equals(TxnId.NONE) ? "none" : 
e.getKey().toString()) + ":" + Ranges.ofSorted(e.getValue().toArray(new 
Range[0])).mergeTouching())
-                  .collect(Collectors.joining(", ", "{", "}"));
+        if (map.size() == 0 || map.size() == 1 && 
map.firstKey().equals(TxnId.NONE))
+            return;
+
+        if (builder.length() > 0)
+            builder.append(delimiter);
+        builder.append(prefix);
+        builder.append(map.descendingMap().entrySet().stream()
+                  .map(e -> (e.getKey().equals(TxnId.NONE) ? "none" : 
e.getKey().toString()) + ':' + Ranges.ofSorted(e.getValue().toArray(new 
Range[0])).mergeTouching())
+                  .collect(Collectors.joining(", ", "{", "}")));
     }
 }
diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java 
b/accord-core/src/main/java/accord/local/RedundantStatus.java
index 4ac9c8e2..24d30268 100644
--- a/accord-core/src/main/java/accord/local/RedundantStatus.java
+++ b/accord-core/src/main/java/accord/local/RedundantStatus.java
@@ -109,13 +109,13 @@ public class RedundantStatus
          * We have applied the preceding transactions durably to the store, so 
that we can safely truncate the Write
          * information as we will not need to replay it to the store
          */
-        LOCALLY_DURABLE_TO_DATA_STORE      (false, false,  LE, 
LOCALLY_APPLIED),
+        LOCALLY_DURABLE_TO_DATA_STORE      (false, true,  LE, LOCALLY_APPLIED),
 
         /**
          * We have applied the preceding transactions durably to all summary 
structures, so that on restart we do
          * not need to replay the transaction to restore any internal state.
          */
-        LOCALLY_DURABLE_TO_COMMAND_STORE   (false, false,  LE, 
LOCALLY_APPLIED),
+        LOCALLY_DURABLE_TO_COMMAND_STORE   (false, true,  LE, LOCALLY_APPLIED),
 
         /**
          * We have fully executed until across all a majority of replicas for 
the range in question,
diff --git a/accord-core/src/main/java/accord/local/RejectBefore.java 
b/accord-core/src/main/java/accord/local/RejectBefore.java
index 5be27785..e969a147 100644
--- a/accord-core/src/main/java/accord/local/RejectBefore.java
+++ b/accord-core/src/main/java/accord/local/RejectBefore.java
@@ -31,6 +31,8 @@ import accord.utils.ReducingRangeMap;
 
 public class RejectBefore extends ReducingRangeMap<Timestamp>
 {
+    public static final RejectBefore EMPTY = new RejectBefore();
+
     public static class SerializerSupport
     {
         public static RejectBefore create(RoutingKey[] ends, Timestamp[] 
values)
@@ -76,7 +78,7 @@ public class RejectBefore extends ReducingRangeMap<Timestamp>
             throw new IllegalArgumentException("value is null");
 
         if (ranges.isEmpty())
-            return new RejectBefore();
+            return EMPTY;
 
         return create(ranges, value, Builder::new);
     }
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 3785913e..06296c65 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -21,9 +21,13 @@ package accord.local;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.api.Agent;
 import accord.api.DataStore;
 import accord.api.LocalListeners;
@@ -80,6 +84,8 @@ import static accord.utils.Invariants.illegalState;
  */
 public abstract class SafeCommandStore implements RangesForEpochSupplier, 
RedundantBeforeSupplier, CommandSummaries
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(SafeCommandStore.class);
+
     private static final int MAX_REENTRANCY = 50;
     private int reentrancyCounter;
     public boolean tryRecurse()
@@ -293,20 +299,13 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         if (!updated.txnId().isSyncPoint() || updated.txnId().domain() != 
Range) return;
         if (updated.route() == null) return;
 
-        List<SyncPointListener> listeners = commandStore().syncPointListeners;
-        if (listeners != null)
-        {
-            for (SyncPointListener listener : listeners)
-                listener.update(this, updated);
-        }
-
         SaveStatus prevSaveStatus = prev == null ? SaveStatus.Uninitialised : 
prev.saveStatus();
         SaveStatus newSaveStatus = updated.saveStatus();
 
         if (newSaveStatus.known.isDefinitionKnown() && (force || 
!prevSaveStatus.known.isDefinitionKnown()))
         {
             Ranges ranges = updated.participants().touches().toRanges();
-            commandStore().markExclusiveSyncPoint(this, updated.txnId(), 
ranges);
+            commandStore().upsertRejectBefore(this, updated.txnId(), ranges);
         }
 
         if (newSaveStatus.compareTo(Committed) >= 0 && 
newSaveStatus.compareTo(TruncatedApply) <= 0 && (force || 
prevSaveStatus.compareTo(Committed) < 0))
@@ -343,6 +342,15 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
             if (addRedundantBefore != RedundantBefore.EMPTY)
                 upsertRedundantBefore(addRedundantBefore);
         }
+
+        // invoke listeners only after updating redundantBefore
+        List<SyncPointListener> listeners = commandStore().syncPointListeners;
+        if (listeners != null)
+        {
+            logger.debug("Notifying SyncPoint listeners");
+            for (SyncPointListener listener : listeners)
+                listener.update(this, updated);
+        }
     }
 
     public void updateMaxConflicts(Command prev, Command updated, boolean 
force)
@@ -377,6 +385,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
 //            updateUnmanagedCommandsForKey(this, next, REGISTER_DEPS_ONLY);
     }
 
+    // TODO (expected): should these and related methods live in CommandStore 
for consistency?
     private static void updateManagedCommandsForKey(SafeCommandStore 
safeStore, Command prev, Command next, boolean forceNotify)
     {
         StoreParticipants participants = 
next.participants().supplement(prev.participants());
@@ -523,8 +532,8 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         AsyncChains.chain(() -> commandStore.markingVisible(syncId, waitingOn))
                    .flatMap(ignore -> AsyncChains.reduce(async, 
Reduce.toNull(), null))
                    .begin((success, fail) -> {
-                       if (fail == null) 
commandStore.execute((PreLoadContext.Empty)() -> "Mark Synced", safeStore0 -> 
commandStore.markVisible(safeStore0, syncId, waitingOn));
-                       else commandStore.execute((PreLoadContext.Empty)() -> 
"Unmark Syncing", safeStore0 -> commandStore.cancelMarkingVisible(syncId, 
waitingOn));
+                       if (fail == null) 
commandStore.execute((PreLoadContext.Empty)() -> "Mark Synced", (Consumer<? 
super SafeCommandStore>)  safeStore0 -> commandStore.markVisible(safeStore0, 
syncId, waitingOn), commandStore.agent());
+                       else commandStore.execute((PreLoadContext.Empty)() -> 
"Unmark Syncing", (Consumer<? super SafeCommandStore>)  safeStore0 -> 
commandStore.cancelMarkingVisible(syncId, waitingOn), commandStore.agent);
                    });
     }
 
@@ -561,7 +570,7 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
     protected void unsafeUpsertRedundantBefore(RedundantBefore 
addRedundantBefore)
     {
         commandStore().unsafeUpsertRedundantBefore(addRedundantBefore);
-        commandStore().updatedRedundantBefore(this, addRedundantBefore);
+        commandStore().upsertedRedundantBefore(this, addRedundantBefore);
     }
 
     public void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java 
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 3edc04b6..557fa76e 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -56,6 +56,7 @@ public class SetShardDurable extends NoWaitRequest<Route<?>, 
SimpleReply>
     {
         Invariants.require(durability.compareTo(Quorum) >= 0);
         TxnId syncIdWithFlags = syncIdWithFlags();
+        // TODO (required): does this need to strictly precede updating 
RedundantBefore? Because updating the global map is more expensive.
         node.markDurable(syncPoint.route.toRanges(), syncIdWithFlags, 
durability.compareTo(Universal) >= 0 ? syncIdWithFlags : TxnId.NONE)
         .invoke((success, fail) -> {
             if (fail != null) node.reply(replyTo, replyContext, null, fail);
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java 
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 5fd98234..3f01fefa 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -908,7 +908,7 @@ public abstract class AbstractRanges implements 
Iterable<Range>, Routables<Range
         Invariants.requireArgument(ranges.length == 0 || ranges[0] != null);
         for (int i = 1 ; i < ranges.length ; ++i)
         {
-            if (ranges[i - 1].end().compareTo(ranges[i].start()) > 0)
+            if (ranges[i] == null || ranges[i - 
1].end().compareTo(ranges[i].start()) > 0)
                 throw illegalArgument(Arrays.toString(ranges) + " is not 
correctly sorted or deoverlapped");
         }
 
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index c5c6c9ca..841c4f38 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -96,8 +96,6 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
 
     /**
      * The set of flags we want to retain as we merge timestamps (e.g. when 
taking mergeMax).
-     * Today this is only the REJECTED_FLAG, but we may include additional 
flags in future (such as Committed, Applied..)
-     * which we may also want to retain when merging in other contexts (such 
as in Deps).
      */
     static final int MERGE_FLAGS = REJECTED.bit | UNSTABLE.bit | HLC_BOUND.bit 
| SHARD_BOUND.bit;
     public static final long IDENTITY_LSB = 0xFFFFFFFF_FFFF00FFL;
diff --git a/accord-core/src/main/java/accord/primitives/TxnId.java 
b/accord-core/src/main/java/accord/primitives/TxnId.java
index 655267ed..994f9491 100644
--- a/accord-core/src/main/java/accord/primitives/TxnId.java
+++ b/accord-core/src/main/java/accord/primitives/TxnId.java
@@ -23,6 +23,7 @@ import java.util.regex.Pattern;
 
 import javax.annotation.Nullable;
 
+import accord.local.Node;
 import accord.local.Node.Id;
 import accord.primitives.Routable.Domain;
 import accord.primitives.Txn.Kind;
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index c70d7ddc..f57f339e 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -422,7 +422,7 @@ public class Topology
             for (int i = shards.firstSetBit() ; i >= 0 ; i = 
shards.nextSetBit(i + 1, -1))
             {
                 supersetIndexes[count] = this.supersetIndexes[i];
-                ranges[count] = this.shards[this.supersetIndexes[i]].range;
+                ranges[count++] = this.shards[this.supersetIndexes[i]].range;
             }
             subsetOfRanges = Ranges.ofSortedAndDeoverlapped(ranges);
         }
diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java 
b/accord-core/src/main/java/accord/utils/PersistentField.java
index cc63d899..79034fd9 100644
--- a/accord-core/src/main/java/accord/utils/PersistentField.java
+++ b/accord-core/src/main/java/accord/utils/PersistentField.java
@@ -19,13 +19,18 @@
 package accord.utils;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.TreeSet;
+import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
@@ -40,41 +45,57 @@ public class PersistentField<Input, Saved>
         Saved load();
     }
 
-    private static class Pending<Saved>
+    private static class PendingInput<Input>
+    {
+        final AsyncResult.Settable<Void> done = AsyncResults.settable();
+        final Input input;
+
+        private PendingInput(Input input)
+        {
+            this.input = input;
+        }
+    }
+
+    private static class PendingSave<Saved>
     {
         final int id;
         final Saved saving;
 
-        private Pending(int id, Saved saving)
+        private PendingSave(int id, Saved saving)
         {
             this.id = id;
             this.saving = saving;
         }
     }
 
-    @Nonnull
     private final Supplier<Saved> currentValue;
-    @Nonnull
-    private final BiFunction<Input, Saved, Saved> merge;
-    @Nonnull
+    private final BiFunction<Input, Input, Input> mergeInputs;
+    private final BiFunction<Input, Saved, Saved> mergeToSave;
     private final Persister<Input, Saved> persister;
-    @Nonnull
     private final Consumer<Saved> set;
+    private final Executor mergeExecutor;
 
-    private Saved latestPending;
+    private Saved latestSave;
     private int nextId;
-    private final ArrayDeque<Pending<Saved>> pending = new ArrayDeque<>();
+    private final List<PendingInput<Input>> inputBuffer = new ArrayList<>();
+    private final ArrayDeque<PendingInput<Input>> inputs = new ArrayDeque<>();
+    private final ArrayDeque<PendingSave<Saved>> saves = new ArrayDeque<>();
     private final TreeSet<Integer> complete = new TreeSet<>();
+    private final Lock mergeLock = new ReentrantLock();
 
-    public PersistentField(@Nonnull Supplier<Saved> currentValue, @Nonnull 
BiFunction<Input, Saved, Saved> merge, @Nonnull Persister<Input, Saved> 
persister, @Nullable Consumer<Saved> set)
+    public PersistentField(@Nonnull Supplier<Saved> currentValue, @Nonnull 
BiFunction<Input, Input, Input> mergeInputs, @Nonnull BiFunction<Input, Saved, 
Saved> mergeToSave, @Nonnull Persister<Input, Saved> persister, Consumer<Saved> 
set, Executor mergeExecutor)
     {
         Invariants.nonNull(currentValue, "currentValue cannot be null");
+        Invariants.nonNull(mergeInputs, "mergeInputs cannot be null");
+        Invariants.nonNull(mergeToSave, "mergeToSave cannot be null");
         Invariants.nonNull(persister, "persist cannot be null");
         Invariants.nonNull(set, "set cannot be null");
         this.currentValue = currentValue;
-        this.merge = merge;
+        this.mergeInputs = mergeInputs;
+        this.mergeToSave = mergeToSave;
         this.persister = persister;
         this.set = set;
+        this.mergeExecutor = mergeExecutor;
     }
 
     public void load()
@@ -82,30 +103,79 @@ public class PersistentField<Input, Saved>
         set.accept(persister.load());
     }
 
-    public synchronized AsyncResult<?> mergeAndUpdate(@Nonnull Input 
inputValue)
+    public AsyncResult<?> save(@Nonnull Input inputValue)
     {
-        Invariants.nonNull(merge, "merge cannot be null");
         Invariants.nonNull(inputValue, "inputValue cannot be null");
-        return mergeAndUpdate(inputValue, merge);
+        PendingInput<Input> submit = new PendingInput<>(inputValue);
+        synchronized (this)
+        {
+            inputs.add(submit);
+        }
+        trySave();
+        return submit.done;
+    }
+
+    private void trySave()
+    {
+        if (mergeLock.tryLock())
+        {
+            try { save(); }
+            finally { mergeLock.unlock(); }
+
+            synchronized (this)
+            {
+                if (!inputs.isEmpty())
+                    mergeExecutor.execute(this::trySave);
+            }
+        }
     }
 
-    private AsyncResult<?> mergeAndUpdate(@Nullable Input inputValue, @Nonnull 
BiFunction<Input, Saved, Saved> merge)
+    @GuardedBy("mergeLock")
+    private void save()
     {
-        Invariants.nonNull(merge, "merge cannot be null");
-        Saved startingValue = latestPending;
-        if (startingValue == null)
+        Saved startingValue;
+        synchronized (this)
         {
-            Invariants.require(pending.isEmpty());
-            startingValue = currentValue.get();
+            if (inputs.isEmpty())
+                return;
+
+            inputBuffer.clear();
+            inputBuffer.addAll(inputs);
+            inputs.clear();
+
+            startingValue = latestSave;
+            if (startingValue == null)
+            {
+                Invariants.require(saves.isEmpty());
+                startingValue = currentValue.get();
+            }
         }
-        Saved newValue = merge.apply(inputValue, startingValue);
+
+        Input inputValue = inputBuffer.get(0).input;
+        for (int i = 1; i < inputBuffer.size() ; ++i)
+            inputValue = mergeInputs.apply(inputValue, 
inputBuffer.get(i).input);
+
+        Saved newValue = mergeToSave.apply(inputValue, startingValue);
         if (newValue == startingValue)
-            return AsyncResults.success(null);
-        this.latestPending = newValue;
-        int id = ++nextId;
-        pending.add(new Pending<>(id, newValue));
+        {
+            inputBuffer.forEach(i -> i.done.setSuccess(null));
+            inputBuffer.clear();
+            return;
+        }
+
+        final List<AsyncResult.Settable<Void>> notifyOnDone = new 
ArrayList<>(inputBuffer.size());
+        for (PendingInput<?> pending : inputBuffer)
+            notifyOnDone.add(pending.done);
+        inputBuffer.clear();
+
+        int id;
+        synchronized (this)
+        {
+            this.latestSave = newValue;
+            id = ++nextId;
+            saves.add(new PendingSave<>(id, newValue));
+        }
 
-        AsyncResult.Settable<Void> result = AsyncResults.settable();
         AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue);
         pendingWrite.invoke((success, fail) -> {
             synchronized (this)
@@ -113,17 +183,15 @@ public class PersistentField<Input, Saved>
                 complete.add(id);
                 boolean upd = false;
                 Saved latest = null;
-                while (!complete.isEmpty() && pending.peek().id == 
complete.first())
+                while (!complete.isEmpty() && saves.peek().id == 
complete.first())
                 {
-                    latest = pending.poll().saving;
+                    latest = saves.poll().saving;
                     complete.pollFirst();
                     upd = true;
                 }
                 if (upd) set.accept(latest);
-                result.setSuccess(null);
+                notifyOnDone.forEach(i -> i.setSuccess(null));
             }
         });
-
-        return result;
     }
 }
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index ee0f6f79..972df635 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -21,6 +21,7 @@ import accord.api.RoutingKey;
 import accord.primitives.*;
 
 import java.util.Arrays;
+import java.util.Objects;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.IntFunction;
@@ -494,9 +495,47 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
 
     public <V2> ReducingRangeMap<V2> map(Function<V, V2> map, 
IntFunction<V2[]> allocator)
     {
+        RoutingKey[] starts = null;
         V2[] output = allocator.apply(values.length);
+        int count = 0;
         for (int i = 0 ; i < values.length ; ++i)
-            output[i] = map.apply(values[i]);
+        {
+            V2 next = map.apply(values[i]);
+            if (count == 0 ? next == null : (Objects.equals(next, output[i - 
1])))
+            {
+                if (starts == null)
+                {
+                    starts = new RoutingKey[values.length];
+                    System.arraycopy(this.starts, 0, starts, 0, count);
+                }
+                continue;
+            }
+            if (starts != null)
+                starts[count] = this.starts[i];
+            output[count++] = next;
+        }
+
+        if (count > 0)
+        {
+            if (starts != null)
+            {
+                starts[count] = this.starts[this.starts.length - 1];
+                if (output[count - 1] == null)
+                    --count;
+
+                starts = Arrays.copyOf(starts, count + 1);
+                output = Arrays.copyOf(output, count);
+            }
+            else
+            {
+                Invariants.require(count == values.length);
+                starts = this.starts;
+            }
+        }
+
+        if (count == 0)
+            return new ReducingRangeMap<>();
+
         return new ReducingRangeMap<>(starts, output);
     }
 }
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java 
b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
index aeea27c7..c7ca2a60 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
@@ -30,6 +30,7 @@ public class AsyncCallbacks
     // a runnable interface that may be directly failed
     public interface RunOrFail extends Runnable
     {
+        // run should not throw any exception
         void run();
         void fail(Throwable fail);
     }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index b1ad69ed..2f72ef1f 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -790,7 +790,7 @@ public class Cluster
                     listStore.restore();
                     for (CommandStore store : stores.all())
                         ((ListAgent) 
store.agent()).restore((InMemoryCommandStore) store);
-                    journal.replay(stores);
+                    journal.replay(stores, null);
                     Catchup.catchup(node);
 
                     // Re-enable safety checks
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 2a2a7e97..22a29e5b 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -91,11 +91,11 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         {
             Snapshot current = current();
             RangesForEpoch ranges = e.getValue();
-            CommandStore commandStore = null;
+            DelayedCommandStore commandStore = null;
             for (ShardHolder shard : current)
             {
                 if (shard.ranges().equals(ranges))
-                    commandStore = shard.store;
+                    commandStore = (DelayedCommandStore) shard.store;
             }
             Invariants.nonNull(commandStore, "Each set of ranges should have a 
corresponding command store, but %d did not:(%s)",
                                ranges, Arrays.toString(shards))
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index f079237a..c838c891 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -127,6 +127,11 @@ public class InMemoryJournal implements Journal
         this.partialCompactionChance = 1f - (random.nextFloat()/2);
     }
 
+    @Override
+    public void open(Node node)
+    {
+    }
+
     public void start(Node node)
     {
         this.node = node;
@@ -617,7 +622,7 @@ public class InMemoryJournal implements Journal
     }
 
     @Override
-    public boolean replay(CommandStores commandStores)
+    public boolean replay(CommandStores commandStores, Object param)
     {
         for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> diffEntry : 
diffsPerCommandStore.entrySet())
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 00f675fc..b55c6c2a 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -75,7 +75,12 @@ public class LoggingJournal implements Journal
     }
 
     @Override
+    public void open(Node node)
+    {
+        delegate.open(node);
+    }
 
+    @Override
     public void start(Node node)
     {
         delegate.start(node);
@@ -130,9 +135,9 @@ public class LoggingJournal implements Journal
     }
 
     @Override
-    public boolean replay(CommandStores commandStores)
+    public boolean replay(CommandStores commandStores, Object param)
     {
-        return delegate.replay(commandStores);
+        return delegate.replay(commandStores, null);
     }
 
     @Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 9578254b..7e58289e 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -388,6 +388,7 @@ public class Cluster implements Scheduler
 
     public static class NoOpJournal implements Journal
     {
+        @Override public void open(Node node) { }
         @Override public void start(Node node) { }
         @Override public Command loadCommand(int store, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new 
IllegalStateException("Not impelemented"); }
         @Override public Command.Minimal loadMinimal(int commandStoreId, TxnId 
txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw 
new IllegalStateException("Not impelemented"); }
@@ -396,7 +397,7 @@ public class Cluster implements Scheduler
         @Override public List<TopologyUpdate> replayTopologies() { throw new 
IllegalStateException("Not impelemented"); }
         @Override public void saveTopology(TopologyUpdate topologyUpdate, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
         @Override public void purge(CommandStores commandStores, EpochSupplier 
minEpoch)  { throw new IllegalStateException("Not impelemented"); }
-        @Override public boolean replay(CommandStores commandStores)  { throw 
new IllegalStateException("Not impelemented"); }
+        @Override public boolean replay(CommandStores commandStores, Object 
param)  { throw new IllegalStateException("Not impelemented"); }
         @Override public RedundantBefore loadRedundantBefore(int store) { 
throw new IllegalStateException("Not impelemented"); }
         @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int 
store) { throw new IllegalStateException("Not impelemented"); }
         @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int 
store) { throw new IllegalStateException("Not impelemented"); }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to