This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc14a154 Various fixes and improvements Improve:  - InMemoryJournal 
compaction should simulate files to ensure we compact the same cohorts of 
records (so shadowing applied correctly)  - Don't update CFK deps if already 
known  - avoid unnecessary heapification of LogGroupTimers  - begin removal of 
Guava (mostly unused)  - consider medium path on fast path delayed  - add 
Seekables.indexOf to support faster serialization  - unboxed Invariant.requires 
variant  - AsyncChains.addCallba [...]
fc14a154 is described below

commit fc14a154fd514d4ab40b37508fb9497f786835e0
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Sat Mar 15 09:06:01 2025 +0000

    Various fixes and improvements
    Improve:
     - InMemoryJournal compaction should simulate files to ensure we compact 
the same cohorts of records (so shadowing applied correctly)
     - Don't update CFK deps if already known
     - avoid unnecessary heapification of LogGroupTimers
     - begin removal of Guava (mostly unused)
     - consider medium path on fast path delayed
     - add Seekables.indexOf to support faster serialization
     - unboxed Invariant.requires variant
     - AsyncChains.addCallback -> invoke
    Fix:
     - Recovery must wait for earlier transactions on shards that have not yet 
been accepted, even if we recover a fast Stable record on another shard
     - only skip Dep calculation on PreAccept if vote is required by coordinator
     - ReadTracker must slice Minimal the first unavailable collection
     - If PreLoadContext cannot acquire shared caches, still consider existing 
contents
     - CFK: make sure to insert UNSTABLE into missing array
     - Fix failed task stops DelayedCommandStore task queue processing further 
tasks
     - short circuit AbstractRanges.equals()
     - Handle edge case where we can take fast/medium path but one of the Deps 
replies contains a future TxnId
     - update executeAtEpoch when retryInFutureEpoch
     - Fix incorrect validation in validateMissing (should only validate 
newInfo is not in missing when in witnessedBy; all other additions should be 
included)
    
    patch by Benedict; reviewed by David Capwell for CASSANDRA-20522
---
 accord-core/src/main/java/accord/api/Journal.java  |   2 +-
 .../accord/coordinate/CoordinateTransaction.java   |  71 ++++--
 .../java/accord/coordinate/ExecuteSyncPoint.java   |   4 +-
 .../main/java/accord/coordinate/MaybeRecover.java  |   2 +-
 .../src/main/java/accord/coordinate/Recover.java   | 131 ++++++----
 .../coordinate/tracking/FastPathTracker.java       |  11 +-
 .../accord/coordinate/tracking/ReadTracker.java    |   2 +-
 .../accord/impl/AbstractConfigurationService.java  |  12 +-
 .../java/accord/impl/AbstractSafeCommandStore.java |  17 +-
 .../src/main/java/accord/local/CommandStores.java  |   2 +-
 accord-core/src/main/java/accord/local/Node.java   |  10 +-
 .../main/java/accord/local/RedundantBefore.java    |   2 +-
 .../main/java/accord/local/cfk/CommandsForKey.java |  64 +++--
 .../src/main/java/accord/local/cfk/Pruning.java    |   4 +-
 .../src/main/java/accord/local/cfk/Updating.java   |  79 +++---
 .../src/main/java/accord/local/cfk/Utils.java      |  37 ++-
 .../local/durability/ConcurrencyControl.java       |   4 +-
 .../accord/local/durability/DurabilityQueue.java   |   2 +-
 .../accord/local/durability/ShardDurability.java   |   2 +-
 .../src/main/java/accord/messages/Apply.java       |  13 +-
 .../main/java/accord/messages/BeginRecovery.java   |   2 +-
 .../src/main/java/accord/messages/PreAccept.java   |   2 +-
 .../java/accord/messages/SetGloballyDurable.java   |   2 +-
 .../main/java/accord/messages/SetShardDurable.java |   2 +-
 .../java/accord/messages/WaitUntilApplied.java     |   2 +
 .../java/accord/primitives/AbstractRanges.java     |   8 +
 .../main/java/accord/primitives/LatestDeps.java    |  14 +-
 .../src/main/java/accord/primitives/Routables.java |   1 +
 .../src/main/java/accord/primitives/Timestamp.java |   3 +
 .../main/java/accord/topology/TopologyManager.java |   4 +-
 .../src/main/java/accord/utils/Functions.java      |  17 --
 .../src/main/java/accord/utils/Invariants.java     |   6 +
 .../src/main/java/accord/utils/LogGroupTimers.java |   2 +-
 .../main/java/accord/utils/PersistentField.java    |   2 +-
 .../java/accord/utils/async/AsyncCallbacks.java    |   4 +-
 .../main/java/accord/utils/async/AsyncChain.java   |  40 ++-
 .../main/java/accord/utils/async/AsyncChains.java  |  43 +++-
 .../main/java/accord/utils/async/AsyncResult.java  |  10 +-
 .../main/java/accord/utils/async/AsyncResults.java |   8 +-
 .../src/main/java/accord/utils/btree/BTreeSet.java |   6 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   2 +-
 .../accord/impl/basic/DelayedCommandStores.java    |   4 +-
 .../java/accord/impl/basic/InMemoryJournal.java    | 272 +++++++++++++++++----
 .../java/accord/impl/basic/LoggingJournal.java     |   4 +-
 .../accord/impl/list/ListFetchCoordinator.java     |   2 +-
 .../test/java/accord/impl/list/ListRequest.java    |   2 +-
 .../src/test/java/accord/local/CommandsTest.java   |   2 +-
 .../java/accord/utils/async/AsyncChainsTest.java   |  10 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |   2 +-
 .../java/accord/maelstrom/MaelstromRequest.java    |   2 +-
 50 files changed, 619 insertions(+), 332 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index a1246f76..1be59925 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -43,7 +43,7 @@ import org.agrona.collections.Int2ObjectHashMap;
  */
 public interface Journal
 {
-    Journal start(Node node);
+    void start(Node node);
 
     Command loadCommand(int store, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
     Command.Minimal loadMinimal(int store, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index aceb8ee1..b27eeac9 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -104,38 +104,57 @@ public class CoordinateTransaction extends 
CoordinatePreAccept<Result>
     {
         if (tracker.hasFastPathAccepted())
         {
-            Deps deps = Deps.merge(oks.valuesAsNullableList(), 
oks.domainSize(), List::get, ok -> ok.deps);
-            ExecuteFlags executeFlags = 
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : 
v.and(ok.flags), ExecuteFlags.all());
-            // note: we merge all Deps regardless of witnessedAt. While we 
only need fast path votes,
-            // we must include Deps from fast path votes from earlier epochs 
that may have witnessed later transactions
-            // TODO (desired): we might mask some bugs by merging more 
responses than we strictly need, so optimise this to optionally merge minimal 
deps
-            executeAdapter().execute(node, topologies, route, FAST, 
executeFlags, txnId, txn, txnId, deps, deps, settingCallback());
-            node.agent().eventListener().onFastPathTaken(txnId, deps);
+            Deps deps = mergeFastOrMediumDeps(oks);
+            if (deps != null)
+            {
+                ExecuteFlags executeFlags = 
Functions.foldl(oks.valuesAsNullableList(), (ok, v) -> ok == null ? v : 
v.and(ok.flags), ExecuteFlags.all());
+                // note: we merge all Deps regardless of witnessedAt. While we 
only need fast path votes,
+                // we must include Deps from fast path votes from earlier 
epochs that may have witnessed later transactions
+                // TODO (desired): we might mask some bugs by merging more 
responses than we strictly need, so optimise this to optionally merge minimal 
deps
+                executeAdapter().execute(node, topologies, route, FAST, 
executeFlags, txnId, txn, txnId, deps, deps, settingCallback());
+                node.agent().eventListener().onFastPathTaken(txnId, deps);
+                return;
+            }
         }
         else if (tracker.hasMediumPathAccepted() && txnId.hasMediumPath())
         {
-            Deps deps = Deps.merge(oks.valuesAsNullableList(), 
oks.domainSize(), List::get, ok -> ok.deps);
-            proposeAdapter().propose(node, topologies, route, 
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this);
-            node.agent().eventListener().onMediumPathTaken(txnId, deps);
-        }
-        else
-        {
-            // TODO (low priority, efficiency): perhaps don't submit Accept 
immediately if we almost have enough for fast-path,
-            //                                  but by sending accept we rule 
out hybrid fast-path
-            // TODO (low priority, efficiency): if we receive an expired 
response, perhaps defer to permit at least one other
-            //                                  node to respond before 
invalidating
-            if (executeAt.is(REJECTED))
+            Deps deps = mergeFastOrMediumDeps(oks);
+            if (deps != null)
             {
-                proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, 
route.homeKey(), route, executeAt,this);
-                node.agent().eventListener().onRejected(txnId);
-            }
-            else
-            {
-                Deps deps = Deps.merge(oks.valuesAsNullableList(), 
oks.domainSize(), List::get, ok -> ok.deps);
-                proposeAdapter().propose(node, topologies, route, 
Accept.Kind.SLOW, Ballot.ZERO, txnId, txn, executeAt, deps, this);
-                node.agent().eventListener().onSlowPathTaken(txnId, deps);
+                proposeAdapter().propose(node, topologies, route, 
Accept.Kind.MEDIUM, Ballot.ZERO, txnId, txn, txnId, deps, this);
+                node.agent().eventListener().onMediumPathTaken(txnId, deps);
+                return;
             }
         }
+        else if (executeAt.is(REJECTED))
+        {
+            proposeAndCommitInvalidate(node, Ballot.ZERO, txnId, 
route.homeKey(), route, executeAt,this);
+            node.agent().eventListener().onRejected(txnId);
+            return;
+        }
+
+        Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), 
List::get, ok -> ok.deps);
+        proposeAdapter().propose(node, topologies, route, Accept.Kind.SLOW, 
Ballot.ZERO, txnId, txn, executeAt, deps, this);
+        node.agent().eventListener().onSlowPathTaken(txnId, deps);
+    }
+
+    private Deps mergeFastOrMediumDeps(SortedListMap<?, PreAcceptOk> oks)
+    {
+        // we must merge all Deps replies from prior topologies, but from the 
latest topology we can safely merge only those replies that voted for the fast 
path
+        // TODO (desired): actually merge these topologies separately, rather 
than just switching behaviour when multiple topologies
+        if (tracker.topologies().size() == 1)
+            return Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), 
List::get, ok -> ok.witnessedAt.equals(ok.txnId) ? ok.deps : null);
+
+        Deps deps = Deps.merge(oks.valuesAsNullableList(), oks.domainSize(), 
List::get, ok -> ok.deps);
+        // it is possible that one of the earlier epochs that did not need to 
vote for the fast path
+        // was also unable to compute valid dependencies, and returned a 
future TxnId as a proxy.
+        // In this case while it is still in principle safe to propose the 
fast path, it is simpler not to,
+        // as it permits us to maintain safety validation logic that detects 
unsafe behaviour and execution will
+        // need to wait for the future transaction to be agreed anyway (so we 
can use its dependency calculation).
+        if (deps.maxTxnId(txnId).compareTo(txnId) > 0)
+            return null;
+
+        return deps;
     }
 
     protected CoordinationAdapter<Result> proposeAdapter()
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 0f37728c..79482b51 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -173,7 +173,7 @@ public class ExecuteSyncPoint extends 
SettableResult<DurabilityResult> implement
         {
             node.withEpoch(retryInFutureEpoch, (ignore, failure) -> 
tryFailure(WrappableException.wrap(failure)), () -> {
                 ExecuteSyncPoint continuation = new ExecuteSyncPoint(node, 
syncPoint, node.topology().preciseEpochs(syncPoint.route(), 
tracker.topologies().currentEpoch(), retryInFutureEpoch, SHARE), 
excludeSuccess, executor, attempt, current());
-                continuation.addCallback((success, failure) -> {
+                continuation.invoke((success, failure) -> {
                     if (failure == null) trySuccess(success);
                     else tryFailure(failure);
                 });
@@ -249,6 +249,6 @@ public class ExecuteSyncPoint extends 
SettableResult<DurabilityResult> implement
     {
         SortedArrayList<Node.Id> contact = tracker.filterAndRecordFaulty();
         if (contact == null) tryFailure(new Exhausted(syncPoint.syncId, 
syncPoint.route.homeKey(), null));
-        else node.send(contact, to -> new WaitUntilApplied(to, 
tracker.topologies(), syncPoint.syncId, syncPoint.route, 
syncPoint.syncId.epoch()), executor, this);
+        else node.send(contact, to -> new WaitUntilApplied(to, 
tracker.topologies(), syncPoint.syncId, syncPoint.route, 
tracker.topologies().currentEpoch()), executor, this);
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 81232d89..1a50486c 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -122,7 +122,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
                     else
                     {
                         Invariants.require(Route.isFullRoute(someRoute), 
"Require a full route but given %s", full.route);
-                        node.recover(txnId, full.invalidIf, 
Route.castToFullRoute(someRoute), reportLowEpoch, 
reportHighEpoch).addCallback(callback);
+                        node.recover(txnId, full.invalidIf, 
Route.castToFullRoute(someRoute), reportLowEpoch, 
reportHighEpoch).invoke(callback);
                     }
                     break;
 
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index d3d095d4..74e74f3a 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -67,6 +67,7 @@ import accord.utils.async.AsyncResults;
 
 import static accord.api.ProgressLog.BlockedUntil.CommittedOrNotFastPathCommit;
 import static accord.api.ProgressLog.BlockedUntil.HasCommittedDeps;
+import static accord.api.ProgressLog.BlockedUntil.HasDecidedExecuteAt;
 import static accord.api.ProtocolModifiers.QuorumEpochIntersections;
 import static accord.coordinate.CoordinationAdapter.Factory.Kind.Recovery;
 import static accord.coordinate.ExecutePath.RECOVER;
@@ -253,18 +254,21 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
         if (acceptOrCommitNotTruncated != null)
         {
-            Status status = acceptOrCommitNotTruncated.status;
             Timestamp executeAt = acceptOrCommitNotTruncated.executeAt;
-            if (committedExecuteAt != null)
-            {
-                
Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted)
 < 0 || executeAt.equals(committedExecuteAt));
-                // if we know from a prior Accept attempt that this is 
committed we can go straight to the commit phase
-                if (status == AcceptedMedium || status == AcceptedSlow)
-                    status = Status.Committed;
+            Status status; {
+                Status tmp = acceptOrCommitNotTruncated.status;
+                if (committedExecuteAt != null)
+                {
+                    
Invariants.require(acceptOrCommitNotTruncated.status.compareTo(Status.PreCommitted)
 < 0 || executeAt.equals(committedExecuteAt));
+                    // if we know from a prior Accept attempt that this is 
committed we can go straight to the commit phase
+                    if (tmp == AcceptedMedium || tmp == AcceptedSlow)
+                        tmp = Status.Committed;
+                }
+                status = tmp;
             }
+
             switch (status)
             {
-                default: throw new UnhandledEnum(status);
                 case Truncated: throw illegalState("Truncate should be 
filtered");
                 case Invalidated:
                 {
@@ -272,45 +276,6 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
                     return;
                 }
 
-                case Applied:
-                case PreApplied:
-                {
-                    withStableDeps(recoverOkList, executeAt, (i, t) -> 
node.agent().acceptAndWrap(i, t), stableDeps -> {
-                        adapter.persist(node, tracker.topologies(), route, 
txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, 
acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
-                    });
-                    accept(acceptOrCommitNotTruncated.result, null);
-                    return;
-                }
-
-                case Stable:
-                {
-                    withStableDeps(recoverOkList, executeAt, this, stableDeps 
-> {
-                        adapter.execute(node, tracker.topologies(), route, 
RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, 
this);
-                    });
-                    return;
-                }
-
-                case PreCommitted:
-                case Committed:
-                {
-                    withCommittedDeps(recoverOkList, executeAt, this, 
committedDeps -> {
-                        adapter.stabilise(node, tracker.topologies(), route, 
ballot, txnId, txn, executeAt, committedDeps, this);
-                    });
-                    return;
-                }
-
-                case AcceptedSlow:
-                case AcceptedMedium:
-                {
-                    // TODO (desired): if we have a quorum of Accept with 
matching ballot or proposal we can go straight to Commit
-                    // TODO (desired): if we didn't find Accepted in *every* 
shard, consider invalidating for consistency of behaviour
-                    //     however, note that we may have taken the fast path 
and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
-                    //     (otherwise recovery was attempted and did not 
invalidate, so it must have determined it needed to complete)
-                    Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList, 
ok -> ok == null ? null : ok.deps);
-                    propose(SLOW, acceptOrCommitNotTruncated.executeAt, 
proposeDeps);
-                    return;
-                }
-
                 case AcceptedInvalidate:
                 {
                     invalidate(recoverOks);
@@ -321,6 +286,53 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
                 case PreAccepted:
                     throw illegalState("Should only be possible to have 
Accepted or later commands");
             }
+
+            LatestDeps.Merge merge = mergeDeps(recoverOkList);
+            Participants<?> await = merge.notAccepted(route);
+            awaitPartialEarlier(recoverOkList, await, () -> {
+                switch (status)
+                {
+                    default: throw new UnhandledEnum(status);
+                    case Applied:
+                    case PreApplied:
+                    {
+                        withStableDeps(merge, executeAt, (i, t) -> 
node.agent().acceptAndWrap(i, t), stableDeps -> {
+                            adapter.persist(node, tracker.topologies(), route, 
txnId, txn, executeAt, stableDeps, acceptOrCommitNotTruncated.writes, 
acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
+                        });
+                        accept(acceptOrCommitNotTruncated.result, null);
+                        return;
+                    }
+
+                    case Stable:
+                    {
+                        withStableDeps(merge, executeAt, this, stableDeps -> {
+                            adapter.execute(node, tracker.topologies(), route, 
RECOVER, ExecuteFlags.none(), txnId, txn, executeAt, stableDeps, stableDeps, 
this);
+                        });
+                        return;
+                    }
+
+                    case PreCommitted:
+                    case Committed:
+                    {
+                        withCommittedDeps(merge, executeAt, this, 
committedDeps -> {
+                            adapter.stabilise(node, tracker.topologies(), 
route, ballot, txnId, txn, executeAt, committedDeps, this);
+                        });
+                        return;
+                    }
+
+                    case AcceptedSlow:
+                    case AcceptedMedium:
+                    {
+                        // TODO (desired): if we have a quorum of Accept with 
matching ballot or proposal we can go straight to Commit
+                        // TODO (desired): if we didn't find Accepted in 
*every* shard, consider invalidating for consistency of behaviour
+                        //     however, note that we may have taken the fast 
path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
+                        //     (otherwise recovery was attempted and did not 
invalidate, so it must have determined it needed to complete)
+                        Deps proposeDeps = merge.mergeProposal();
+                        propose(SLOW, acceptOrCommitNotTruncated.executeAt, 
proposeDeps);
+                    }
+                }
+            });
+            return;
         }
 
         if (acceptOrCommit != null && acceptOrCommit != 
acceptOrCommitNotTruncated)
@@ -417,6 +429,23 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         }
     }
 
+    private static LatestDeps.Merge mergeDeps(List<RecoverOk> 
nullableRecoverOkList)
+    {
+        return LatestDeps.merge(nullableRecoverOkList, ok -> ok == null ? null 
: ok.deps);
+    }
+
+    private void awaitPartialEarlier(List<RecoverOk> nullableRecoverOkList, 
Participants<?> participants, Runnable whenReady)
+    {
+        Deps earlierWait = Deps.merge(nullableRecoverOkList, 
nullableRecoverOkList.size(), List::get, ok -> ok.earlierWait);
+        Deps earlierNoWait = Deps.merge(nullableRecoverOkList, 
nullableRecoverOkList.size(), List::get, ok -> ok.earlierNoWait);
+        earlierWait = earlierWait.without(earlierNoWait);
+        earlierWait = earlierWait.intersecting(participants);
+        awaitEarlier(node, earlierWait, HasDecidedExecuteAt).begin((success, 
fail) -> {
+            if (fail != null) accept(null, fail);
+            else whenReady.run();
+        });
+    }
+
     private static boolean supersedingRejects(List<RecoverOk> oks)
     {
         for (RecoverOk ok : oks)
@@ -464,15 +493,13 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         return ok.laterCoordRejects.with(id -> from.equals(id.node));
     }
 
-    private void withCommittedDeps(List<RecoverOk> nullableRecoverOkList, 
Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> 
withDeps)
+    private void withCommittedDeps(LatestDeps.Merge merge, Timestamp 
executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps)
     {
-        LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok -> 
ok == null ? null : ok.deps);
         LatestDeps.withCommitted(adapter, node, merge, route, ballot, txnId, 
executeAt, txn, failureCallback, withDeps);
     }
 
-    private void withStableDeps(List<RecoverOk> nullableRecoverOkList, 
Timestamp executeAt, BiConsumer<?, Throwable> failureCallback, Consumer<Deps> 
withDeps)
+    private void withStableDeps(LatestDeps.Merge merge, Timestamp executeAt, 
BiConsumer<?, Throwable> failureCallback, Consumer<Deps> withDeps)
     {
-        LatestDeps.Merge merge = LatestDeps.merge(nullableRecoverOkList, ok -> 
ok == null ? null : ok.deps);
         LatestDeps.withStable(adapter, node, merge, Deps.NONE, route, null, 
null, route, ballot, txnId, executeAt, txn, failureCallback, withDeps);
     }
 
@@ -506,7 +533,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
     private void propose(Accept.Kind kind, Timestamp executeAt, 
List<RecoverOk> recoverOkList)
     {
-        Deps proposeDeps = LatestDeps.mergeProposal(recoverOkList, ok -> ok == 
null ? null : ok.deps);
+        Deps proposeDeps = mergeDeps(recoverOkList).mergeProposal();
         propose(kind, executeAt, proposeDeps);
     }
 
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index d2168d0d..f1fa1eae 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -100,7 +100,7 @@ public class FastPathTracker extends 
PreAcceptTracker<FastPathTracker.FastPathSh
                 ++fastPathFailures;
 
                 if (hasRejectedFastPath() && hasReachedQuorum())
-                    return complete(Success);
+                    return mediumOrSlowSuccess();
             }
 
             return NoChange;
@@ -116,7 +116,7 @@ public class FastPathTracker extends 
PreAcceptTracker<FastPathTracker.FastPathSh
                 ++fastPathDelayed;
 
                 if (isFastPathDelayed() && hasReachedQuorum())
-                    return complete(Success);
+                    return mediumOrSlowSuccess();
             }
 
             return NoChange;
@@ -125,10 +125,15 @@ public class FastPathTracker extends 
PreAcceptTracker<FastPathTracker.FastPathSh
         final ShardOutcome<? super FastPathTracker> 
quorumIfHasRejectedFastPath()
         {
             return hasReachedQuorum() && hasRejectedFastPath()
-                   ? hasMetMediumPathCriteria() ? 
complete(NewMediumPathSuccess) : complete(Success)
+                   ? mediumOrSlowSuccess()
                    : NoChange;
         }
 
+        final ShardOutcome<? super FastPathTracker> mediumOrSlowSuccess()
+        {
+            return hasMetMediumPathCriteria() ? complete(NewMediumPathSuccess) 
: complete(Success);
+        }
+
         final boolean isFastPathDelayed()
         {
             return shard.rejectsFastPath(fastQuorumSize, fastPathDelayed);
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index ddec94a2..dd9c2c90 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -117,7 +117,7 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker>
                 return NoChange;
 
             // TODO (low priority, efficiency): support slice method accepting 
a single Range
-            if (unavailable == null) unavailable = 
partialSuccess.unavailable.slice(Ranges.of(shard.range));
+            if (unavailable == null) unavailable = 
partialSuccess.unavailable.slice(Ranges.of(shard.range), Minimal);
             else unavailable = unavailable.slice(partialSuccess.unavailable, 
Minimal);
             if (!unavailable.isEmpty())
                 return ensureProgressOrFail();
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index 3cf7df2a..4fc927f0 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -334,9 +334,9 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (epochs.wasTruncated(ready.epoch))
             return;
 
-        ready.metadata.addCallback(() -> epochs.acknowledge(ready));
-        ready.coordinate.addCallback(() -> 
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
-        ready.reads.addCallback(() ->  
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
+        ready.metadata.invokeIfSuccess(() -> epochs.acknowledge(ready));
+        ready.coordinate.invokeIfSuccess(() -> 
localSyncComplete(epochs.getOrCreate(ready.epoch).topology, startSync));
+        ready.reads.invokeIfSuccess(() ->  
localBootstrapsComplete(epochs.getOrCreate(ready.epoch).topology));
     }
 
     protected void topologyUpdatePostListenerNotify(Topology topology) {}
@@ -350,7 +350,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
         {
             logger.debug("Epoch {} received; waiting to receive {} before 
reporting", topology.epoch(), lastReceived + 1);
-            epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
             fetchTopologyForEpoch(lastReceived + 1);
             return;
         }
@@ -359,14 +359,14 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastAcked == 0 && lastReceived > 0)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), epochs.minEpoch());
-            epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
             return;
         }
 
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), lastAcked + 1);
-            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
             return;
         }
 
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java 
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index d67b4105..0e9a1ebb 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -86,14 +86,14 @@ extends SafeCommandStore
 
         try (Caches caches = tryGetCaches())
         {
-            if (caches == null)
-                return with.isSubsetOf(this.context) ? with : null;
-
             for (TxnId txnId : with.txnIds())
             {
                 if (null != getInternal(txnId))
                     continue;
 
+                if (caches == null)
+                    return null;
+
                 C safeCommand = caches.acquireIfLoaded(txnId);
                 if (safeCommand == null)
                     return null;
@@ -116,11 +116,14 @@ extends SafeCommandStore
                 if (null != getInternal(key))
                     continue; // already in working set
 
-                CFK safeCfk = caches.acquireIfLoaded(key);
-                if (safeCfk != null)
+                if (caches != null)
                 {
-                    add(safeCfk, caches);
-                    continue;
+                    CFK safeCfk = caches.acquireIfLoaded(key);
+                    if (safeCfk != null)
+                    {
+                        add(safeCfk, caches);
+                        continue;
+                    }
                 }
                 if (unavailable == null)
                     unavailable = new ArrayList<>();
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index abe95748..4ce5e59a 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -781,7 +781,7 @@ public abstract class CommandStores
         for (ShardHolder shard : shards)
             results.add(shard.store.build(context, mapper));
 
-        return AsyncChains.all(results);
+        return AsyncChains.allOf(results);
     }
 
     protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream 
commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce)
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 446633c5..a5c421c2 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -238,7 +238,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     public AsyncResult<Void> unsafeStart()
     {
         EpochReady ready = 
onTopologyUpdateInternal(configService.currentTopology(), false);
-        ready.coordinate.addCallback(() -> 
this.topology.onEpochSyncComplete(id, topology.epoch()));
+        ready.coordinate.invokeIfSuccess(() -> 
this.topology.onEpochSyncComplete(id, topology.epoch()));
         configService.acknowledgeEpoch(ready, false);
         return ready.metadata;
     }
@@ -357,7 +357,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         if (topology.epoch() <= this.topology.epoch())
             return AsyncResults.success(null);
         EpochReady ready = onTopologyUpdateInternal(topology, startSync);
-        ready.coordinate.addCallback(() -> 
this.topology.onEpochSyncComplete(id, topology.epoch()));
+        ready.coordinate.invokeIfSuccess(() -> 
this.topology.onEpochSyncComplete(id, topology.epoch()));
         configService.acknowledgeEpoch(ready, startSync);
         return ready.coordinate;
     }
@@ -790,7 +790,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     {
         AsyncResult<Result> result = withEpoch(Math.max(txnId.epoch(), 
minEpoch), () -> initiateCoordination(txnId, txn)).beginAsResult();
         coordinating.putIfAbsent(txnId, result);
-        result.addCallback((success, fail) -> coordinating.remove(txnId, 
result));
+        result.invoke((success, fail) -> coordinating.remove(txnId, result));
         return result;
     }
 
@@ -855,7 +855,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
             return future;
         }).beginAsResult();
         coordinating.putIfAbsent(txnId, result);
-        result.addCallback((success, fail) -> coordinating.remove(txnId, 
result));
+        result.invoke((success, fail) -> coordinating.remove(txnId, result));
         return result;
     }
 
@@ -865,7 +865,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         if (waitForEpoch > topology.epoch())
         {
             configService.fetchTopologyForEpoch(waitForEpoch);
-            topology().awaitEpoch(waitForEpoch).addCallback((ignored, failure) 
-> {
+            topology().awaitEpoch(waitForEpoch).invoke((ignored, failure) -> {
                 if (failure != null)
                     
agent().onUncaughtException(WrappableException.wrap(failure));
                 else
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 7efe5509..3a86e5cc 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -519,7 +519,7 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Bounds>
             if (bounds == null)
                 return execute;
 
-            Invariants.require(executeAt == null ? !bounds.outOfBounds(txnId) 
: !bounds.outOfBounds(txnId, executeAt));
+            Invariants.require(txnId.isSyncPoint() || (executeAt == null ? 
!bounds.outOfBounds(txnId) : !bounds.outOfBounds(txnId, executeAt)));
             if (bounds.is(txnId, PRE_BOOTSTRAP_OR_STALE))
                 return without.apply(execute, Ranges.of(bounds.range));
 
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index c30a140f..628b5718 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -660,7 +660,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
             return encoded;
         }
 
-        private static int encode(TxnId txnId, InternalStatus internalStatus, 
boolean mayExecute)
+        static int encode(TxnId txnId, InternalStatus internalStatus, boolean 
mayExecute)
         {
             int encoded = internalStatus.txnInfoEncoded | (mayExecute ? 
MAY_EXECUTE : 0);
             if (txnId.is(Key)) encoded |= MANAGED;
@@ -1262,10 +1262,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
             int insertPos = Arrays.binarySearch(byId, testStartedAtTimestamp);
             if (insertPos < 0)
             {
-                loadingFor = NO_TXNIDS;
+                loadingFor = NOT_LOADING_PRUNED;
                 insertPos = -1 - insertPos;
                 if (computeIsDep != IGNORE && 
testTxnId.compareTo(prunedBefore) < 0)
-                    loadingFor = loadingPrunedFor(loadingPruned, testTxnId, 
NO_TXNIDS);
+                    loadingFor = loadingPrunedFor(loadingPruned, testTxnId, 
NOT_LOADING_PRUNED);
             }
 
             switch (testStartedAt)
@@ -1300,7 +1300,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
                         TxnId[] missing = txn.missing();
                         hasAsDep = missing == NO_TXNIDS || 
Arrays.binarySearch(txn.missing(), testTxnId) < 0;
                     }
-                    else if (loadingFor == NO_TXNIDS)
+                    else if (loadingFor == NOT_LOADING_PRUNED)
                     {
                         hasAsDep = false;
                     }
@@ -1603,35 +1603,33 @@ public class CommandsForKey extends CommandsForKeyUpdate
         {
             // update
             TxnInfo cur = byId[pos];
-            if (cur != null)
+            int c = cur.compareTo(newStatus);
+            if (c > 0)
             {
-                int c = cur.compareTo(newStatus);
-                if (c > 0)
-                {
-                    // newStatus moves us backwards; we only permit this for 
(Pre)?(Not)?Accepted states
-                    if (cur.compareTo(COMMITTED) >= 0 || 
newStatus.compareTo(PREACCEPTED) <= 0)
-                        return this;
+                // newStatus moves us backwards; we only permit this for 
(Pre)?(Not)?Accepted states
+                if (cur.compareTo(COMMITTED) >= 0 || 
newStatus.compareTo(PREACCEPTED) <= 0)
+                    return this;
 
-                    // and only when the new ballot is strictly greater
-                    if (updated.acceptedOrCommitted().compareTo(cur.ballot()) 
<= 0)
-                        return this;
-                }
-                else if (c == 0)
-                {
-                    // we're updating to the same state; we only do this with 
a strictly greater ballot;
-                    // even so, if we have no executeAt or deps there's 
nothing to record
-                    if (updated.acceptedOrCommitted().compareTo(cur.ballot()) 
<= 0 || !newStatus.hasExecuteAtOrDeps())
-                        return this;
-                }
-                else
-                {
-                    // we're advancing to a higher status, but this is only 
permitted either if the new state is stable or the ballot is higher
-                    if (cur.compareTo(STABLE) < 0 && 
updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0)
-                        return this;
-                }
+                // and only when the new ballot is strictly greater
+                if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0)
+                    return this;
+            }
+            else if (c == 0)
+            {
+                // we're updating to the same state; we only do this with a 
strictly greater ballot;
+                // even so, if we have no executeAt or deps there's nothing to 
record
+                if (updated.acceptedOrCommitted().compareTo(cur.ballot()) <= 0 
|| !newStatus.hasExecuteAtOrDeps())
+                    return this;
+            }
+            else
+            {
+                // we're advancing to a higher status, but this is only 
permitted either if the new state is stable or the ballot is higher
+                if (cur.compareTo(STABLE) < 0 && 
updated.acceptedOrCommitted().compareTo(cur.ballot()) < 0)
+                    return this;
             }
 
             if (isOutOfRange) result = insertOrUpdateOutOfRange(pos, txnId, 
cur, newStatus, mayExecute, updated, witnessedBy);
+            else if (cur.compareTo(STABLE) >= 0) result = update(pos, txnId, 
cur, cur.withEncodedStatus(TxnInfo.encode(txnId, newStatus, cur.mayExecute(), 
cur.statusOverrides())), updated, witnessedBy);
             else if (newStatus.hasDeps()) result = update(pos, txnId, cur, 
newStatus, mayExecute, updated, witnessedBy);
             else result = update(pos, txnId, cur, TxnInfo.create(txnId, 
newStatus, mayExecute, updated), updated, witnessedBy);
         }
@@ -2000,10 +1998,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
             return new CommandsForKeyUpdateWithPostProcess(newCfk, 
newPostProcess);
         }
 
-        TxnInfo[] newById = removeRedundantById(byId, bounds, newBounds);
+        Object[] newLoadingPruned = 
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
+        TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned != 
loadingPruned, bounds, newBounds);
         int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), 
redundantBefore(newBounds));
         Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 || 
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 : 
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
-        Object[] newLoadingPruned = 
Pruning.removeRedundantLoadingPruned(loadingPruned, redundantBefore(newBounds));
 
         long maxUniqueHlc = this.maxUniqueHlc;
         if (maxUniqueHlc <= newBounds.gcBefore.hlc() && 
newBounds.gcBefore.is(HLC_BOUND))
@@ -2022,10 +2020,10 @@ public class CommandsForKey extends CommandsForKeyUpdate
     {
         QuickBounds newBoundsInfo = 
bounds.withGcBeforeBeforeAtLeast(newRedundantBefore);
 
-        TxnInfo[] newById = removeRedundantById(byId, bounds, newBoundsInfo);
+        Object[] newLoadingPruned = 
Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore);
+        TxnInfo[] newById = removeRedundantById(byId, newLoadingPruned != 
loadingPruned, bounds, newBoundsInfo);
         int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), 
newRedundantBefore);
         Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 : 
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
-        Object[] newLoadingPruned = 
Pruning.removeRedundantLoadingPruned(loadingPruned, newRedundantBefore);
 
         return reconstruct(key, newBoundsInfo, true, newById, maxUniqueHlc, 
newLoadingPruned, newPrunedBeforeById, unmanageds);
     }
@@ -2216,7 +2214,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
                     {
                         Invariants.require(txn.witnesses(missingId));
                         TxnInfo missingInfo = get(missingId, byId);
-                        
Invariants.require(missingInfo.status().compareTo(InternalStatus.COMMITTED) < 
0);
+                        Invariants.require(missingInfo == null ? 
missingId.is(UNSTABLE) && find(loadingPruned, missingId) != null : 
missingInfo.status().compareTo(InternalStatus.COMMITTED) < 0);
                         
Invariants.require(txn.depsKnownBefore().compareTo(missingId) >= 0);
                     }
                     if (txn.isCommittedAndExecutes())
diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java 
b/accord-core/src/main/java/accord/local/cfk/Pruning.java
index 08788c05..29aec370 100644
--- a/accord-core/src/main/java/accord/local/cfk/Pruning.java
+++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java
@@ -527,7 +527,7 @@ public class Pruning
         return epochPrunedBefores;
     }
 
-    static TxnInfo[] removeRedundantById(TxnInfo[] byId, QuickBounds 
prevBounds, QuickBounds newBounds)
+    static TxnInfo[] removeRedundantById(TxnInfo[] byId, boolean 
hasRedundantLoadingPruned, QuickBounds prevBounds, QuickBounds newBounds)
     {
         TxnId newRedundantBefore = redundantBefore(newBounds);
         TxnId newBootstrappedAt = bootstrappedAt(newBounds);
@@ -538,7 +538,7 @@ public class Pruning
 
         TxnInfo[] newById = byId;
         int pos = insertPos(byId, newRedundantBefore);
-        if (pos != 0)
+        if (pos != 0 || hasRedundantLoadingPruned)
         {
             if (Invariants.isParanoid() && testParanoia(LINEAR, NONE, LOW))
             {
diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java 
b/accord-core/src/main/java/accord/local/cfk/Updating.java
index a33d3b5b..d93f2735 100644
--- a/accord-core/src/main/java/accord/local/cfk/Updating.java
+++ b/accord-core/src/main/java/accord/local/cfk/Updating.java
@@ -47,6 +47,7 @@ import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.utils.ArrayBuffers.ObjectBuffers;
 import accord.utils.Invariants;
 import accord.utils.RelationMultiMap;
 import accord.utils.SortedArrays;
@@ -65,6 +66,7 @@ import static accord.local.cfk.CommandsForKey.NO_INFOS;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.APPLY;
 import static accord.local.cfk.CommandsForKey.Unmanaged.Pending.COMMIT;
 import static accord.local.cfk.CommandsForKey.executesIgnoringBootstrap;
+import static accord.local.cfk.CommandsForKey.manages;
 import static accord.local.cfk.CommandsForKey.reportLinearizabilityViolation;
 import static accord.local.cfk.CommandsForKey.mayExecute;
 import static accord.local.cfk.Pruning.removeLoadingPruned;
@@ -120,7 +122,6 @@ class Updating
 
     static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int 
insertPos, int updatePos, TxnId plainTxnId, TxnInfo curInfo, InternalStatus 
newStatus, boolean mayExecute, Command command, @Nonnull TxnId[] witnessedBy)
     {
-        // TODO (expected): do not calculate any deps or additions if we're 
transitioning from Stable to Applied; wasted effort and might trigger LoadPruned
         Object newInfoObj = computeInfoAndAdditions(cfk, insertPos, updatePos, 
plainTxnId, newStatus, mayExecute, command);
         if (newInfoObj.getClass() != InfoWithAdditions.class)
             return insertOrUpdate(cfk, insertPos, plainTxnId, curInfo, 
(TxnInfo)newInfoObj, command, witnessedBy);
@@ -157,7 +158,7 @@ class Updating
         TxnInfo[] newById = new TxnInfo[byId.length + additionCount + 
(updatePos < 0 ? 1 : 0)];
         insertOrUpdateWithAdditions(byId, insertPos, updatePos, plainTxnId, 
newInfo, additions, additionCount, newById, newCommittedByExecuteAt, 
witnessedBy, cfk.bounds);
         if (testParanoia(SUPERLINEAR, NONE, LOW))
-            validateMissing(newById, additions, additionCount, curInfo, 
newInfo, NO_TXNIDS);
+            validateMissing(newById, additions, additionCount, curInfo, 
newInfo, witnessedBy);
 
         int newMinUndecidedById = updateMinUndecidedById(newInfo, insertPos, 
updatePos, cfk, byId, newById, additions, additionCount);
         // we don't insert anything before prunedBeforeById (we LoadPruned 
instead), so we can simply bump it by 0 or 1
@@ -299,14 +300,14 @@ class Updating
         MergeCursor<TxnId, DepList> deps = 
command.partialDeps().txnIds(cfk.key());
         deps.find(cfk.redundantBefore());
 
-        return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, 
newStatus, mayExecute, ballot, executeAt, depsKnownBefore, deps);
+        return computeInfoAndAdditions(cfk.byId, insertPos, updatePos, txnId, 
newStatus, mayExecute, ballot, executeAt, cfk.prunedBefore(), depsKnownBefore, 
deps);
     }
 
     /**
      * We return an Object here to avoid wasting allocations; most of the time 
we expect a new TxnInfo to be returned,
      * but if we have transitive dependencies to insert we return an 
InfoWithAdditions
      */
-    static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int 
updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, 
Ballot ballot, Timestamp executeAt, Timestamp depsKnownBefore, 
MergeCursor<TxnId, DepList> deps)
+    static Object computeInfoAndAdditions(TxnInfo[] byId, int insertPos, int 
updatePos, TxnId plainTxnId, InternalStatus newStatus, boolean mayExecute, 
Ballot ballot, Timestamp executeAt, TxnInfo prunedBefore, Timestamp 
depsKnownBefore, MergeCursor<TxnId, DepList> deps)
     {
         TxnId[] additions = NO_TXNIDS, missing = NO_TXNIDS;
         int additionCount = 0, missingCount = 0;
@@ -335,12 +336,8 @@ class Updating
                 //  we should ensure any existing TRANSITIVE entries are 
upgraded.
                 // OR we should remove TRANSITIVE for simplicity,
                 // OR document/enforce that TRANSITIVE_VISIBLE can only be 
applied to dependencies of unmanaged transactions
-                if (d.is(UNSTABLE) && t.compareTo(COMMITTED) < 0 && 
t.witnesses(d))
-                {
-                    if (missingCount == missing.length)
-                        missing = cachedTxnIds().resize(missing, missingCount, 
Math.max(8, missingCount * 2));
-                    missing[missingCount++] = d;
-                }
+                if (d.is(UNSTABLE)  && txnIdsIndex < depsKnownBeforePos && 
t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(d))
+                    missing = append(missing, missingCount++, d, 
cachedTxnIds());
 
                 ++txnIdsIndex;
                 deps.advance();
@@ -350,21 +347,20 @@ class Updating
                 // we expect to be missing ourselves
                 // we also permit any transaction we have recorded as 
COMMITTED or later to be missing, as recovery will not need to consult our 
information
                 if (txnIdsIndex != updatePos && txnIdsIndex < 
depsKnownBeforePos && t.compareTo(COMMITTED) < 0 && plainTxnId.witnesses(t))
-                {
-                    if (missingCount == missing.length)
-                        missing = cachedTxnIds().resize(missing, missingCount, 
Math.max(8, missingCount * 2));
-                    missing[missingCount++] = t.plainTxnId();
-                }
+                    missing = append(missing, missingCount++, t.plainTxnId(), 
cachedTxnIds());
                 txnIdsIndex++;
             }
             else
             {
                 if (plainTxnId.witnesses(d))
                 {
-                    if (additionCount >= additions.length)
-                        additions = cachedTxnIds().resize(additions, 
additionCount, Math.max(8, additionCount * 2));
-
-                    additions[additionCount++] = d;
+                    if (d.is(UNSTABLE))
+                    {
+                        if (d.compareTo(depsKnownBefore) < 0 && (manages(d) || 
d.compareTo(prunedBefore) > 0))
+                            missing = append(missing, missingCount++, d, 
cachedTxnIds());
+                        d = d.withoutNonIdentityFlags();
+                    }
+                    additions = append(additions, additionCount++, d, 
cachedTxnIds());
                 }
                 else
                 {
@@ -384,9 +380,13 @@ class Updating
                 TxnId d = deps.cur();
                 if (plainTxnId.witnesses(d))
                 {
-                    if (additionCount >= additions.length)
-                        additions = cachedTxnIds().resize(additions, 
additionCount, Math.max(8, additionCount * 2));
-                    additions[additionCount++] = 
deps.cur().withoutNonIdentityFlags();
+                    if (d.is(UNSTABLE))
+                    {
+                        if (d.compareTo(depsKnownBefore) < 0 && (manages(d) || 
d.compareTo(prunedBefore) > 0))
+                            missing = append(missing, missingCount++, d, 
cachedTxnIds());
+                        d = d.withoutNonIdentityFlags();
+                    }
+                    additions = append(additions, additionCount++, d, 
cachedTxnIds());
                 }
                 deps.advance();
             }
@@ -398,13 +398,9 @@ class Updating
             {
                 if (txnIdsIndex != updatePos && 
byId[txnIdsIndex].compareTo(COMMITTED) < 0)
                 {
-                    TxnId txnId = byId[txnIdsIndex].plainTxnId();
-                    if ((plainTxnId.witnesses(txnId)))
-                    {
-                        if (missingCount == missing.length)
-                            missing = cachedTxnIds().resize(missing, 
missingCount, Math.max(8, missingCount * 2));
-                        missing[missingCount++] = txnId;
-                    }
+                    TxnInfo txn = byId[txnIdsIndex];
+                    if (plainTxnId.witnesses(txn))
+                        missing = append(missing, missingCount++, 
txn.plainTxnId(), cachedTxnIds());
                 }
                 txnIdsIndex++;
             }
@@ -417,6 +413,14 @@ class Updating
         return new InfoWithAdditions(info, additions, additionCount);
     }
 
+    private static <T> T[] append(T[] array, int index, T add, 
ObjectBuffers<T> cached)
+    {
+        if (index == array.length)
+            array = cached.resize(array, index, Math.max(8, index * 2));
+        array[index] = add;
+        return array;
+    }
+
     static CommandsForKeyUpdate insertOrUpdate(CommandsForKey cfk, int pos, 
TxnId plainTxnId, TxnInfo curInfo, TxnInfo newInfo, Command command, @Nullable 
TxnId[] witnessedBy)
     {
         if (curInfo == newInfo)
@@ -475,6 +479,10 @@ class Updating
             // TODO (desired): for consistency, move this to insertOrUpdate 
(without additions), while maintaining the efficiency
             Utils.addToMissingArrays(newById, newCommittedByExecuteAt, 
newInfo, plainTxnId, witnessedBy);
         }
+        else if (witnessedBy != null && newInfo.compareTo(COMMITTED) >= 0)
+        {
+            Utils.removeFromWitnessMissingArrays(newById, 
newCommittedByExecuteAt, plainTxnId, witnessedBy);
+        }
 
         if (testParanoia(SUPERLINEAR, NONE, LOW) && curInfo == null && 
newInfo.compareTo(COMMITTED) < 0)
             validateMissing(newById, NO_TXNIDS, 0, curInfo, newInfo, 
witnessedBy);
@@ -499,7 +507,8 @@ class Updating
 
         // we may need to insert or remove ourselves, depending on the new and 
prior status
         boolean insertSelfMissing = sourceUpdatePos < 0 && 
newInfo.compareTo(COMMITTED) < 0;
-        boolean removeSelfMissing = sourceUpdatePos >= 0 && 
newInfo.compareTo(COMMITTED) >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED) 
< 0;
+        boolean removeSelfMissing = newInfo.compareTo(COMMITTED) >= 0 && 
sourceUpdatePos >= 0 && byId[sourceUpdatePos].compareTo(COMMITTED) < 0;
+        boolean removeWitnessedMissing = newInfo.compareTo(COMMITTED) >= 0 && 
witnessedBy.length > 0;
         // missingSource starts as additions, but if we insertSelfMissing at 
the relevant moment it becomes the merge of additions and plainTxnId
         TxnId[] missingSource = additions;
 
@@ -536,20 +545,24 @@ class Updating
                     }
 
                     int to = missingTo(txn, depsKnownBefore, missingSource, 
missingCount, missingLimit);
-                    if (to > 0 || removeSelfMissing)
+                    if (to > 0 || removeSelfMissing || removeWitnessedMissing)
                     {
+                        int witnessedByIndex = -1;
+                        if (witnessedBy != NOT_LOADING_PRUNED && (to > 0 || 
!removeSelfMissing))
+                            witnessedByIndex = 
Arrays.binarySearch(witnessedBy, txn);
+
                         TxnId[] curMissing = txn.missing();
                         TxnId[] newMissing = curMissing;
                         if (to > 0)
                         {
                             TxnId skipInsertMissing = null;
-                            if (Arrays.binarySearch(witnessedBy, plainTxnId) 
>= 0)
+                            if (insertSelfMissing && witnessedByIndex >= 0)
                                 skipInsertMissing = plainTxnId;
 
                             newMissing = mergeAndFilterMissing(txn, 
curMissing, missingSource, to, skipInsertMissing);
                         }
 
-                        if (removeSelfMissing)
+                        if (removeSelfMissing || (removeWitnessedMissing && 
witnessedByIndex >= 0))
                             newMissing = removeOneMissing(newMissing, 
plainTxnId);
 
                         if (newMissing != curMissing)
diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java 
b/accord-core/src/main/java/accord/local/cfk/Utils.java
index a8a0ae39..695e8f88 100644
--- a/accord-core/src/main/java/accord/local/cfk/Utils.java
+++ b/accord-core/src/main/java/accord/local/cfk/Utils.java
@@ -42,6 +42,7 @@ class Utils
 {
     static void validateMissing(TxnInfo[] byId, TxnId[] additions, int 
additionCount, TxnInfo curInfo, TxnInfo newInfo, @Nonnull TxnId[] 
shouldNotHaveMissing)
     {
+        int newInfoAdditionIndex = Arrays.binarySearch(additions, 0, 
additionCount, newInfo);
         for (TxnInfo txn : byId)
         {
             if (txn == newInfo) continue;
@@ -54,7 +55,7 @@ class Utils
             {
                 if (!txn.witnesses(additions[i])) continue;
                 j = SortedArrays.exponentialSearch(missing, j, missing.length, 
additions[i]);
-                if (shouldNotHaveMissing != NO_TXNIDS && 
Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0) Invariants.require(j < 0);
+                if (shouldNotHaveMissing != NO_TXNIDS && i == 
newInfoAdditionIndex && Arrays.binarySearch(shouldNotHaveMissing, txn) >= 0) 
Invariants.require(j < 0);
                 else Invariants.require(j >= 0);
             }
             if (curInfo == null && newInfo.compareTo(COMMITTED) < 0 && 
txn.witnesses(newInfo) && txn.depsKnownBefore().compareTo(newInfo) > 0 && 
(shouldNotHaveMissing == NO_TXNIDS || Arrays.binarySearch(shouldNotHaveMissing, 
txn) < 0))
@@ -95,6 +96,40 @@ class Utils
         removeFromMissingArraysById(byId, minSearchIndex, byId.length, 
removeTxnId);
     }
 
+    /**
+     * {@code removeTxnId} no longer needs to be tracked in missing arrays;
+     * remove it from byId and committedByExecuteAt, ensuring both arrays 
still reference the same TxnInfo where updated
+     */
+    static void removeFromWitnessMissingArrays(TxnInfo[] byId, TxnInfo[] 
committedByExecuteAt, TxnId removeTxnId, TxnId[] witnessedBy)
+    {
+        if (witnessedBy.length == 0)
+            return;
+
+        int byIdIndex = Arrays.binarySearch(byId, witnessedBy[0]);
+        if (byIdIndex < 0)
+            byIdIndex = -1 - byIdIndex;
+
+        for (TxnId txnId : witnessedBy)
+        {
+            byIdIndex = SortedArrays.exponentialSearch(byId, byIdIndex, 
byId.length, txnId);
+            if (byIdIndex < 0)
+            {
+                byIdIndex = -1 - byIdIndex;
+                continue;
+            }
+            TxnInfo curTxn = byId[byIdIndex];
+            TxnId[] curMissing = curTxn.missing();
+            if (curMissing == NO_TXNIDS) continue;
+            TxnId[] newMissing = removeOneMissing(curMissing, removeTxnId);
+            if (newMissing == curMissing) continue;
+            TxnInfo newTxn = curTxn.withMissing(newMissing);
+            byId[byIdIndex] = newTxn;
+            if (!curTxn.isCommittedAndExecutes()) continue;
+            int byExecuteAtIndex = Arrays.binarySearch(committedByExecuteAt, 
curTxn, TxnInfo::compareExecuteAt);
+            committedByExecuteAt[byExecuteAtIndex] = newTxn;
+        }
+    }
+
     /**
      * {@code removeTxnId} no longer needs to be tracked in missing arrays;
      * remove it from a range of byId ACCEPTED status entries only, that could 
not be tracked via committedByExecuteAt
diff --git 
a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java 
b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
index ea840f09..202e7eff 100644
--- a/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
+++ b/accord-core/src/main/java/accord/local/durability/ConcurrencyControl.java
@@ -49,7 +49,7 @@ class ConcurrencyControl implements BiConsumer<Object, 
Throwable>
 
         void start(ConcurrencyControl concurrencyControl)
         {
-            supplier.get().addCallback(result).begin(concurrencyControl);
+            supplier.get().invoke(result).begin(concurrencyControl);
         }
     }
 
@@ -80,7 +80,7 @@ class ConcurrencyControl implements BiConsumer<Object, 
Throwable>
             }
             ++inflight;
         }
-        return supplier.get().addCallback(this);
+        return supplier.get().invoke(this);
     }
 
     synchronized void setMaxConcurrency(int newMaxConcurrency)
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 431a396c..9f209e52 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -248,7 +248,7 @@ public class DurabilityQueue
         if (request != null)
             request.reportAttempt(exclusiveSyncPoint.syncId, 
node.elapsed(MICROSECONDS), coordinate);
 
-        coordinate.addCallback((success, fail) -> {
+        coordinate.invoke((success, fail) -> {
             synchronized (this)
             {
                 unregisterInProgress(exclusiveSyncPoint, coordinate);
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index 4c17b8c4..f9abe277 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -321,7 +321,7 @@ public class ShardDurability
                                 syncId -> node.withEpoch(syncId.epoch(),
                                     () -> syncPointControl.submit(
                                         () -> 
CoordinateSyncPoint.exclusive(node, syncId, (FullRoute<Range>) 
node.computeRoute(syncId, ranges))
-                                                                 
.addCallback(logSyncPoint(syncId, ranges))
+                                                                 
.invoke(logSyncPoint(syncId, ranges))
                             )))
                             .begin((success, fail) -> {
                                 scheduled = null;
diff --git a/accord-core/src/main/java/accord/messages/Apply.java 
b/accord-core/src/main/java/accord/messages/Apply.java
index 379f0a7b..8b5f3855 100644
--- a/accord-core/src/main/java/accord/messages/Apply.java
+++ b/accord-core/src/main/java/accord/messages/Apply.java
@@ -52,9 +52,9 @@ public class Apply extends TxnRequest<ApplyReply>
     public static final Factory FACTORY = Apply::new;
     public static class SerializationSupport
     {
-        public static Apply create(TxnId txnId, Route<?> scope, long minEpoch, 
long waitForEpoch, Kind kind, Timestamp executeAt, PartialDeps deps, PartialTxn 
txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
+        public static Apply create(TxnId txnId, Route<?> scope, long minEpoch, 
long waitForEpoch, long maxEpoch, Kind kind, Timestamp executeAt, PartialDeps 
deps, PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result 
result)
         {
-            return new Apply(kind, txnId, scope, minEpoch, waitForEpoch, 
executeAt, deps, txn, fullRoute, writes, result);
+            return new Apply(kind, txnId, scope, minEpoch, waitForEpoch, 
maxEpoch, executeAt, deps, txn, fullRoute, writes, result);
         }
     }
 
@@ -71,6 +71,7 @@ public class Apply extends TxnRequest<ApplyReply>
     public final @Nullable Writes writes;
     public final Result result;
     public final long minEpoch;
+    public final long maxEpoch;
 
     public enum Kind { Minimal, Maximal }
 
@@ -86,6 +87,7 @@ public class Apply extends TxnRequest<ApplyReply>
         this.writes = writes;
         this.result = result;
         this.minEpoch = participates.oldestEpoch();
+        this.maxEpoch = participates.currentEpoch();
     }
 
     public static Topologies participates(Node node, Unseekables<?> route, 
TxnId txnId, Timestamp executeAt, Topologies executes)
@@ -98,7 +100,7 @@ public class Apply extends TxnRequest<ApplyReply>
         return node.topology().preciseEpochs(route, txnId.epoch(), 
executeAt.epoch(), SHARE);
     }
 
-    protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, 
long waitForEpoch, Timestamp executeAt, PartialDeps deps, @Nullable PartialTxn 
txn, @Nullable FullRoute<?> fullRoute, Writes writes, Result result)
+    protected Apply(Kind kind, TxnId txnId, Route<?> route, long minEpoch, 
long waitForEpoch, long maxEpoch, Timestamp executeAt, PartialDeps deps, 
@Nullable PartialTxn txn, @Nullable FullRoute<?> fullRoute, Writes writes, 
Result result)
     {
         super(txnId, route, waitForEpoch);
         this.kind = kind;
@@ -109,19 +111,20 @@ public class Apply extends TxnRequest<ApplyReply>
         this.writes = writes;
         this.result = result;
         this.minEpoch = minEpoch;
+        this.maxEpoch = maxEpoch;
     }
 
     @Override
     public Cancellable submit()
     {
-        return node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), 
this);
+        return node.mapReduceConsumeLocal(this, minEpoch, maxEpoch, this);
     }
 
     @Override
     public ApplyReply apply(SafeCommandStore safeStore)
     {
         Route<?> route = fullRoute != null ? fullRoute : scope;
-        StoreParticipants participants = StoreParticipants.execute(safeStore, 
route, minEpoch, txnId, executeAt.epoch());
+        StoreParticipants participants = StoreParticipants.execute(safeStore, 
route, minEpoch, txnId, maxEpoch);
         return apply(safeStore, participants);
     }
 
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java 
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index d3c0da31..e858a659 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -479,7 +479,7 @@ public class BeginRecovery extends 
TxnRequest.WithUnsynced<BeginRecovery.Recover
                    ", deps:" + deps +
                    ", earlierWait:" + earlierWait +
                    ", earlierNoWait:" + earlierNoWait +
-                   ", laterNoVote:" + laterCoordRejects +
+                   ", laterCoordRejects:" + laterCoordRejects +
                    ", selfAcceptsFastPath:" + selfAcceptsFastPath +
                    (txnId.hasPrivilegedCoordinator() ? ", 
coordinatorFastPath:" + selfAcceptsFastPath : "") +
                    ", supersedingRejects:" + supersedingRejects +
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java 
b/accord-core/src/main/java/accord/messages/PreAccept.java
index a55201e6..dabff631 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -123,7 +123,7 @@ public class PreAccept extends 
WithUnsynced<PreAccept.PreAcceptReply>
                 if (command.status().compareTo(Status.PreAccepted) > 0)
                     return PreAcceptNack.INSTANCE;
 
-                if (command.executeAt().is(REJECTED))
+                if (command.executeAt().is(REJECTED) && 
!participants.owns().isEmpty()) // if our vote is required we don't need to 
compute deps
                     return new PreAcceptOk(txnId, command.executeAt(), 
Deps.NONE, ExecuteFlags.none());
 
             case Retired:
diff --git a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java 
b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
index ff3a6c99..473ea98d 100644
--- a/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetGloballyDurable.java
@@ -40,7 +40,7 @@ public class SetGloballyDurable implements Request, 
PreLoadContext
     @Override
     public void process(Node node, Node.Id from, ReplyContext replyContext)
     {
-        node.markDurable(durableBefore).addCallback((success, fail) -> {
+        node.markDurable(durableBefore).invoke((success, fail) -> {
             node.reply(from, replyContext, fail == null ? Ok : null, fail);
         });
     }
diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java 
b/accord-core/src/main/java/accord/messages/SetShardDurable.java
index 1ead882f..5f27c3e1 100644
--- a/accord-core/src/main/java/accord/messages/SetShardDurable.java
+++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java
@@ -57,7 +57,7 @@ public class SetShardDurable extends 
AbstractRequest<SimpleReply>
         
Invariants.require(durability.compareTo(Durability.MajorityOrInvalidated) >= 0);
         TxnId syncIdWithFlags = syncIdWithFlags();
         node.markDurable(exclusiveSyncPoint.route.toRanges(), syncIdWithFlags, 
durability.compareTo(Durability.UniversalOrInvalidated) >= 0 ? syncIdWithFlags 
: TxnId.NONE)
-        .addCallback((success, fail) -> {
+        .invoke((success, fail) -> {
             if (fail != null) node.reply(replyTo, replyContext, null, fail);
             else node.mapReduceConsumeLocal(this, exclusiveSyncPoint.route, 
waitForEpoch(), waitForEpoch(), this);
         });
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index bc3d8dac..a2fa3c88 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -26,6 +26,7 @@ import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.utils.Invariants;
 
 import static 
accord.messages.MessageType.StandardMessage.WAIT_UNTIL_APPLIED_REQ;
 import static accord.primitives.SaveStatus.Applied;
@@ -52,6 +53,7 @@ public class WaitUntilApplied extends ReadData
     {
         super(to, topologies, txnId, scope, executeAtEpoch);
         this.minEpoch = topologies.oldestEpoch();
+        Invariants.require(minEpoch <= executeAtEpoch);
     }
 
     protected WaitUntilApplied(TxnId txnId, Participants<?> scope, long 
minEpoch, long executeAtEpoch)
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java 
b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index dc19ff20..d7bcad48 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -222,6 +222,12 @@ public abstract class AbstractRanges implements 
Iterable<Range>, Routables<Range
         return SortedArrays.binarySearch(ranges, 0, size(), find, 
Range::compareIntersecting, search);
     }
 
+    @Override
+    public final int indexOf(Range find)
+    {
+        return Arrays.binarySearch(ranges, 0, size(), find, Range::compare);
+    }
+
     @Override
     public final int findNext(int thisIndex, Range find, SortedArrays.Search 
search)
     {
@@ -648,6 +654,8 @@ public abstract class AbstractRanges implements 
Iterable<Range>, Routables<Range
     @Override
     public boolean equals(Object that)
     {
+        if (this == that)
+            return true;
         if (that == null || this.getClass() != that.getClass())
             return false;
         return Arrays.equals(this.ranges, ((AbstractRanges) that).ranges);
diff --git a/accord-core/src/main/java/accord/primitives/LatestDeps.java 
b/accord-core/src/main/java/accord/primitives/LatestDeps.java
index cb6a656b..294df183 100644
--- a/accord-core/src/main/java/accord/primitives/LatestDeps.java
+++ b/accord-core/src/main/java/accord/primitives/LatestDeps.java
@@ -46,9 +46,11 @@ import accord.utils.UnhandledEnum;
 import static accord.messages.Accept.Kind.SLOW;
 import static accord.primitives.Known.KnownDeps.DepsCommitted;
 import static accord.primitives.Known.KnownDeps.DepsErased;
+import static accord.primitives.Known.KnownDeps.DepsFromCoordinator;
 import static accord.primitives.Known.KnownDeps.DepsKnown;
 import static accord.primitives.Known.KnownDeps.DepsProposed;
 import static accord.primitives.Known.KnownDeps.DepsProposedFixed;
+import static accord.primitives.Known.KnownDeps.DepsUnknown;
 import static accord.topology.Topologies.SelectNodeOwnership.SHARE;
 
 public class LatestDeps extends ReducingRangeMap<LatestDeps.LatestEntry>
@@ -419,7 +421,17 @@ public class LatestDeps extends 
ReducingRangeMap<LatestDeps.LatestEntry>
             return result;
         }
 
-        Deps mergeProposal()
+        public Participants<?> notAccepted(Participants<?> participants)
+        {
+            for (int i = 0 ; i < values.length ; ++i)
+            {
+                if (values[i] != null && values[i].known != DepsUnknown && 
values[i].known != DepsFromCoordinator)
+                    participants = 
participants.without(Ranges.of(starts[i].rangeFactory().newRange(starts[i], 
starts[i + 1])));
+            }
+            return participants;
+        }
+
+        public Deps mergeProposal()
         {
             return mergeProposal(null);
         }
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java 
b/accord-core/src/main/java/accord/primitives/Routables.java
index 16db6efb..cf08d5fb 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -51,6 +51,7 @@ public interface Routables<K extends Routable> extends 
Iterable<K>
     }
 
     K get(int i);
+    int indexOf(K key);
     int size();
 
     boolean isEmpty();
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 41a33693..c1e0ceb4 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -360,6 +360,9 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
 
     public boolean equals(Timestamp that)
     {
+        if (that == this)
+            return true;
+
         return that != null && this.msb == that.msb
                && ((this.lsb ^ that.lsb) & IDENTITY_LSB) == 0
                && this.node.equals(that.node);
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 39943296..72f12f53 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -1220,7 +1220,9 @@ public class TopologyManager
         Epochs snapshot = epochs;
         EpochState maxState = snapshot.get(maxEpoch);
 
-        Invariants.require(maxState != null, "Unable to find epoch %d; known 
epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
+        if (maxState == null)
+            throw new TopologyRetiredException(maxEpoch, snapshot.minEpoch());
+
         if (minEpoch == maxEpoch)
             return new Single(sorter, 
selectFunction.apply(snapshot.get(minEpoch).global, select, 
selectNodeOwnership));
 
diff --git a/accord-core/src/main/java/accord/utils/Functions.java 
b/accord-core/src/main/java/accord/utils/Functions.java
index 58551c41..8e25d80e 100644
--- a/accord-core/src/main/java/accord/utils/Functions.java
+++ b/accord-core/src/main/java/accord/utils/Functions.java
@@ -18,29 +18,12 @@
 
 package accord.utils;
 
-import javax.annotation.Nonnull;
 import java.util.List;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
 public class Functions
 {
-
-    public static <T> T reduceNonNull(BiFunction<T, T, T> merge, T a, T b)
-    {
-        return a == null ? b : b == null ? a : merge.apply(a, b);
-    }
-
-    public static <T1, T2> T1 reduceNonNull(BiFunction<T1, T2, T1> merge, 
@Nonnull T1 a, T2 ... bs)
-    {
-        for (T2 b : bs)
-        {
-            if (b != null)
-                a = merge.apply(a, b);
-        }
-        return a;
-    }
-
     public static <I, O> O mapReduceNonNull(Function<I, O> map, BiFunction<O, 
O, O> reduce, List<I> input)
     {
         O result = null;
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java 
b/accord-core/src/main/java/accord/utils/Invariants.java
index fe6cae97..1e2b7bab 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -285,6 +285,12 @@ public class Invariants
             throw illegalState(format(fmt, p1, p2, p3));
     }
 
+    public static void require(boolean condition, String fmt, int p1, 
@Nullable Object p2, @Nullable Object p3)
+    {
+        if (!condition)
+            throw illegalState(format(fmt, p1, p2, p3));
+    }
+
     public static <P> void require(boolean condition, String fmt, @Nullable 
Object p1, @Nullable Object p2, @Nullable P p3, Function<? super P, Object> 
transformP3)
     {
         if (!condition)
diff --git a/accord-core/src/main/java/accord/utils/LogGroupTimers.java 
b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
index 4d45479d..338ef27b 100644
--- a/accord-core/src/main/java/accord/utils/LogGroupTimers.java
+++ b/accord-core/src/main/java/accord/utils/LogGroupTimers.java
@@ -339,7 +339,7 @@ public class LogGroupTimers<T extends LogGroupTimers.Timer>
         {
             wakeAt = deadline;
         }
-        else if (prevDeadline == wakeAt && bucketsStart != bucketsEnd)
+        else if (prevDeadline == wakeAt && prevDeadline != Long.MAX_VALUE && 
bucketsStart != bucketsEnd)
         {
             Bucket<T> head = buckets[bucketsStart];
             if (!head.isHeapified())
diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java 
b/accord-core/src/main/java/accord/utils/PersistentField.java
index 6face04d..117f8157 100644
--- a/accord-core/src/main/java/accord/utils/PersistentField.java
+++ b/accord-core/src/main/java/accord/utils/PersistentField.java
@@ -105,7 +105,7 @@ public class PersistentField<Input, Saved>
         pending.add(new Pending<>(id, newValue));
 
         AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue);
-        pendingWrite.addCallback((success, fail) -> {
+        pendingWrite.invoke((success, fail) -> {
             synchronized (this)
             {
                 complete.add(id);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java 
b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
index d8950f94..d05fc1ec 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java
@@ -49,8 +49,6 @@ public class AsyncCallbacks
     }
 
     public static <T> BiConsumer<T, Throwable> toCallback(Runnable runnable) {
-        return (unused, failure) -> {
-            if (failure == null) runnable.run();
-        };
+        return (s, f) -> runnable.run();
     }
 }
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChain.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
index 0899b268..b2f6070f 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChain.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChain.java
@@ -27,13 +27,8 @@ import java.util.function.Function;
 
 import javax.annotation.Nullable;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 public interface AsyncChain<V>
 {
-    /**
-     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
-     */
     <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper);
 
     default <T> AsyncChain<T> map(Function<? super V, ? extends T> mapper, 
Executor executor)
@@ -41,9 +36,6 @@ public interface AsyncChain<V>
         return AsyncChains.map(this, mapper, executor);
     }
 
-    /**
-     * Support {@link 
com.google.common.util.concurrent.Futures#transform(ListenableFuture, 
com.google.common.base.Function, Executor)} natively
-     */
     <T> AsyncChain<T> flatMap(Function<? super V, ? extends AsyncChain<T>> 
mapper);
 
     default <T> AsyncChain<T> flatMap(Function<? super V, ? extends 
AsyncChain<T>> mapper, Executor executor)
@@ -78,19 +70,19 @@ public interface AsyncChain<V>
      */
     AsyncChain<V> recover(Function<? super Throwable, ? extends AsyncChain<V>> 
mapper);
 
-    default AsyncChain<Void> accept(Consumer<? super V> action)
+    default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action)
     {
         return map(r -> {
             action.accept(r);
-            return null;
+            return r;
         });
     }
 
-    default AsyncChain<Void> accept(Consumer<? super V> action, Executor 
executor)
+    default AsyncChain<V> invokeIfSuccess(Consumer<? super V> action, Executor 
executor)
     {
         return map(r -> {
             action.accept(r);
-            return null;
+            return r;
         }, executor);
     }
 
@@ -101,27 +93,29 @@ public interface AsyncChain<V>
         return map(a -> a, e);
     }
 
-    /**
-     * Support {@link com.google.common.util.concurrent.Futures#addCallback} 
natively
-     */
-    AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback);
+    AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback);
 
     /**
-     * Adds a callback that only listens to the successful case, a failed 
chain will not trigger the callback
+     * Adds a callback that fires on success only
      */
-    default AsyncChain<V> addCallback(Runnable runnable)
+    default AsyncChain<V> invokeIfSuccess(Runnable runnable)
     {
-        return addCallback(AsyncCallbacks.toCallback(runnable));
+        return invoke((success, fail) -> {
+            if (fail == null) runnable.run();
+        });
     }
 
-    default AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback, ExecutorService es)
+    /**
+     * Adds a callback that fires on either success or failure
+     */
+    default AsyncChain<V> invoke(Runnable runnable)
     {
-        return addCallback(AsyncCallbacks.inExecutorService(callback, es));
+        return invoke((success, fail) -> runnable.run());
     }
 
-    default AsyncChain<V> addCallback(Runnable runnable, ExecutorService es)
+    default AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback, 
ExecutorService es)
     {
-        return addCallback(AsyncCallbacks.inExecutorService(runnable, es));
+        return invoke(AsyncCallbacks.inExecutorService(callback, es));
     }
 
     /**
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java 
b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index bc5256b6..060e4f7c 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -19,6 +19,7 @@
 package accord.utils.async;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Callable;
@@ -39,7 +40,6 @@ import java.util.function.Function;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,7 +129,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
         }
 
         @Override
-        public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback)
         {
             if (value == null || value.getClass() != FailureHolder.class)
                 callback.accept((V) value, null);
@@ -141,7 +141,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
         @Override
         public Cancellable begin(BiConsumer<? super V, Throwable> callback)
         {
-            addCallback(callback);
+            invoke(callback);
             return null;
         }
     }
@@ -454,7 +454,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
     }
 
     @Override
-    public AsyncChain<V> addCallback(BiConsumer<? super V, Throwable> callback)
+    public AsyncChain<V> invoke(BiConsumer<? super V, Throwable> callback)
     {
         return add(EncapsulatedCallback::new, callback);
     }
@@ -655,28 +655,45 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
         });
     }
 
-    public static <V> AsyncChain<V[]> allOf(List<? extends AsyncChain<? 
extends V>> chains)
+    public static <V> AsyncChain<List<V>> allOf(List<? extends AsyncChain<? 
extends V>> chains)
     {
-        return new AsyncChainCombiner<>(chains);
+        return allOfInternal(chains).map(Arrays::asList);
     }
 
-    public static <V> AsyncChain<List<V>> all(List<? extends AsyncChain<? 
extends V>> chains)
+    // cannot expose this as we're actually providing an Object[] to the next 
in the chain
+    // which is not safe if receiver statically expecting the strongly typed 
array
+    private static <V> AsyncChain<V[]> allOfInternal(List<? extends 
AsyncChain<? extends V>> chains)
     {
-        return new AsyncChainCombiner<>(chains).map(Lists::newArrayList);
+        return new AsyncChainCombiner<>(chains);
     }
 
     public static <V> AsyncChain<V> reduce(List<? extends AsyncChain<? extends 
V>> chains, BiFunction<? super V, ? super V, ? extends V> reducer)
     {
         if (chains.size() == 1)
             return (AsyncChain<V>) chains.get(0);
-        return allOf(chains).map(r -> reduceArray(r, reducer));
+        return allOfInternal(chains).map(r -> reduceArray(r, reducer));
+    }
+
+    public static <I, O> AsyncChain<O> reduce(List<? extends AsyncChain<? 
extends I>> chains, BiFunction<? super I, ? super O, ? extends O> reducer, O 
identity)
+    {
+        if (chains.size() == 1)
+            return chains.get(0).map(i -> reducer.apply(i, identity));
+        return allOfInternal(chains).map(r -> reduceArray(r, reducer, 
identity));
     }
 
-    private static <V> V reduceArray(V[] results, BiFunction<? super V, ? 
super V, ? extends V> reducer)
+    private static <V> V reduceArray(Object[] results, BiFunction<? super V, ? 
super V, ? extends V> reducer)
     {
-        V result = results[0];
+        V result = (V) results[0];
         for (int i=1; i< results.length; i++)
-            result = reducer.apply(result, results[i]);
+            result = reducer.apply(result, (V)results[i]);
+        return result;
+    }
+
+    private static <I, O> O reduceArray(Object[] results, BiFunction<? super 
I, ? super O, ? extends O> reducer, O identity)
+    {
+        O result = identity;
+        for (int i=0; i< results.length; i++)
+            result = reducer.apply((I)results[i], identity);
         return result;
     }
 
@@ -710,7 +727,7 @@ public abstract class AsyncChains<V> implements 
AsyncChain<V>
             case 0: return AsyncChains.success(identity);
             case 1: return chains.get(0).map(a -> reducer.apply(identity, a));
         }
-        return allOf(chains).map(results -> {
+        return allOfInternal(chains).map(results -> {
             B result = identity;
             for (A r : results)
                 result = reducer.apply(result, r);
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
index 30fd524f..fae3bc9c 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
@@ -31,13 +31,7 @@ import static accord.utils.Invariants.illegalState;
 public interface AsyncResult<V> extends AsyncChain<V>
 {
     @Override
-    AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> callback);
-
-    @Override
-    default AsyncResult<V> addCallback(Runnable runnable)
-    {
-        return addCallback(AsyncCallbacks.toCallback(runnable));
-    }
+    AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback);
 
     boolean isDone();
     boolean isSuccess();
@@ -46,7 +40,7 @@ public interface AsyncResult<V> extends AsyncChain<V>
     default @Nullable Cancellable begin(BiConsumer<? super V, Throwable> 
callback)
     {
         //TODO chain shouldn't allow double calling, but should result allow?
-        addCallback(callback);
+        invoke(callback);
         return null;
     }
 
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResults.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
index 01632db4..6c954376 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResults.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResults.java
@@ -136,7 +136,7 @@ public class AsyncResults
                 @Override
                 protected Cancellable start(BiConsumer<? super V, Throwable> 
callback)
                 {
-                    AbstractResult.this.addCallback(callback);
+                    AbstractResult.this.invoke(callback);
                     return null;
                 }
             };
@@ -172,7 +172,7 @@ public class AsyncResults
         }
 
         @Override
-        public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
         {
             Listener<V> listener = null;
             while (true)
@@ -290,7 +290,7 @@ public class AsyncResults
                 @Override
                 protected Cancellable start(BiConsumer<? super V, Throwable> 
callback)
                 {
-                    AsyncResults.Immediate.this.addCallback(callback);
+                    AsyncResults.Immediate.this.invoke(callback);
                     return null;
                 }
             };
@@ -315,7 +315,7 @@ public class AsyncResults
         }
 
         @Override
-        public AsyncResult<V> addCallback(BiConsumer<? super V, Throwable> 
callback)
+        public AsyncResult<V> invoke(BiConsumer<? super V, Throwable> callback)
         {
             callback.accept(value, failure);
             return this;
diff --git a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java 
b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
index 1122da86..1d7dd15b 100644
--- a/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
+++ b/accord-core/src/main/java/accord/utils/btree/BTreeSet.java
@@ -31,8 +31,6 @@ import java.util.SortedSet;
 import java.util.Spliterator;
 import java.util.Spliterators;
 
-import com.google.common.collect.Ordering;
-
 import static accord.utils.btree.BTree.Dir;
 import static accord.utils.btree.BTree.findIndex;
 
@@ -76,7 +74,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements 
NavigableSet<V>, List
      */
     public V get(int index)
     {
-        return BTree.<V>findByIndex(tree, index);
+        return BTree.findByIndex(tree, index);
     }
 
     public int lastIndexOf(Object o)
@@ -658,7 +656,7 @@ public class BTreeSet<V> extends AbstractSet<V> implements 
NavigableSet<V>, List
 
     public static <V extends Comparable<V>> BTreeSet<V> of(V value)
     {
-        return new BTreeSet<>(BTree.singleton(value), Ordering.<V>natural());
+        return new BTreeSet<>(BTree.singleton(value), 
Comparator.naturalOrder());
     }
 
     public static <V> BTreeSet<V> empty(Comparator<? super V> comparator)
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 6a6a8f9e..54a8314a 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -915,7 +915,7 @@ public class Cluster
                 Command afterCommand = e.getValue().value();
                 if (beforeCommand == null)
                 {
-                    
Invariants.requireArgument(afterCommand.is(Status.NotDefined) || 
afterCommand.saveStatus() == SaveStatus.Vestigial);
+                    
Invariants.requireArgument(afterCommand.is(Status.NotDefined) || 
afterCommand.saveStatus().compareTo(SaveStatus.Vestigial) >= 0);
                     continue;
                 }
                 if (afterCommand.hasBeen(Status.Truncated))
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 975cc727..b02f0744 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -360,8 +360,8 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             if (next == null)
                 return;
 
-            next.addCallback(agent()); // used to track unexpected exceptions 
and notify simulations
-            next.addCallback(this::afterExecution);
+            next.invoke(agent()); // used to track unexpected exceptions and 
notify simulations
+            next.invoke(this::afterExecution);
             executor.execute(next);
         }
 
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 32303e91..2b4edb8c 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,6 +20,8 @@ package accord.impl.basic;
 
 import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.List;
@@ -33,7 +35,6 @@ import com.google.common.collect.ImmutableSortedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Agent;
 import accord.api.Journal;
 import accord.api.Result;
 import accord.impl.CommandChange;
@@ -110,35 +111,34 @@ import static accord.utils.Invariants.illegalState;
 public class InMemoryJournal implements Journal
 {
     private static final Logger log = 
LoggerFactory.getLogger(InMemoryJournal.class);
-    private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
+    private final Int2ObjectHashMap<NavigableMap<TxnId, Diffs>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
     private final List<TopologyUpdate> topologyUpdates = new ArrayList<>();
     private final Int2ObjectHashMap<FieldUpdates> fieldStates = new 
Int2ObjectHashMap<>();
 
     private Node node;
-    private Agent agent;
     private final RandomSource random;
+    private final float partialCompactionChance;
 
     public InMemoryJournal(Node.Id id, RandomSource random)
     {
         this.random = random;
+        this.partialCompactionChance = 1f - (random.nextFloat()/2);
     }
 
-    public Journal start(Node node)
+    public void start(Node node)
     {
         this.node = node;
-        this.agent = node.agent();
-        return this;
     }
 
     @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        NavigableMap<TxnId, List<Diff>> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
+        NavigableMap<TxnId, Diffs> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
 
         if (commandStore == null)
             return null;
 
-        List<Diff> saved = 
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
+        Diffs saved = this.diffsPerCommandStore.get(commandStoreId).get(txnId);
         if (saved == null)
             return null;
 
@@ -174,7 +174,7 @@ public class InMemoryJournal implements Journal
 
     private Builder reconstruct(int commandStoreId, TxnId txnId, Load load)
     {
-        NavigableMap<TxnId, List<Diff>> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
+        NavigableMap<TxnId, Diffs> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
 
         if (commandStore == null)
             return null;
@@ -182,12 +182,13 @@ public class InMemoryJournal implements Journal
         return 
reconstruct(this.diffsPerCommandStore.get(commandStoreId).get(txnId), load);
     }
 
-    private Builder reconstruct(List<Diff> saved, Load load)
+    private Builder reconstruct(Diffs files, Load load)
     {
-        if (saved == null)
+        if (files == null)
             return null;
 
         Builder builder = null;
+        List<Diff> saved = files.sorted(false);
         for (int i = saved.size() - 1; i >= 0; i--)
         {
             Diff diff = saved.get(i);
@@ -212,8 +213,8 @@ public class InMemoryJournal implements Journal
         }
 
         diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new 
TreeMap<>())
-                            .computeIfAbsent(update.txnId, (k_) -> new 
ArrayList<>())
-                            .add(diff);
+                            .computeIfAbsent(update.txnId, (k_) -> new Diffs())
+                            .addFlushed(diff);
 
         if (onFlush!= null)
             onFlush.run();
@@ -323,50 +324,189 @@ public class InMemoryJournal implements Journal
             onFlush.run();
     }
 
+    static class DiffFile extends ArrayList<Diff>
+    {
+        DiffFile(){}
+        DiffFile(List<Diff> diffs)
+        {
+            for (Diff diff : diffs)
+            {
+                if (diff != null)
+                    add(diff);
+            }
+        }
+    }
+
+    static class Diffs
+    {
+        final boolean subset;
+        final List<DiffFile> files;
+        final List<Diff> flushed;
+        int nextId;
+
+        int size;
+        List<Diff> sorted;
+
+        Diffs()
+        {
+            this.subset = false;
+            this.files = new ArrayList<>();
+            this.flushed = new ArrayList<>();
+        }
+
+        Diffs(PurgedList purged)
+        {
+            this.subset = false;
+            this.files = Collections.emptyList();
+            this.flushed = purged;
+        }
+
+        Diffs(TruncatedList truncated)
+        {
+            this.subset = false;
+            this.files = new ArrayList<>();
+            this.flushed = truncated;
+            this.size = 1;
+        }
+
+        Diffs(ErasedList erased)
+        {
+            this.subset = false;
+            this.files = Collections.emptyList();
+            this.flushed = erased;
+            this.size = 1;
+        }
+
+        Diffs(boolean subset, List<DiffFile> files, List<Diff> flushed)
+        {
+            this.subset = subset;
+            this.files = files;
+            this.flushed = flushed;
+            this.size = flushed.size();
+            for (DiffFile file : files)
+                size += file.size();
+        }
+
+        void addFlushed(Diff diff)
+        {
+            diff.rowId = ++nextId;
+            flushed.add(diff);
+            if (sorted != null && sorted != flushed)
+                sorted.add(diff);
+            ++size;
+        }
+
+        List<Diff> sorted(boolean copy)
+        {
+            if (sorted != null)
+            {
+                Invariants.require(sorted.size() == size);
+                return copy ? new ArrayList<>(sorted) : sorted;
+            }
+
+            if (!subset)
+            {
+                if (files.isEmpty())
+                    return copy ? new ArrayList<>(flushed) : flushed;
+
+                if (flushed.isEmpty() && files.size() == 1)
+                    return copy ? new ArrayList<>(files.get(0)) : files.get(0);
+            }
+
+            List<Diff> sorted = new ArrayList<>(size);
+            for (Diff diff : flushed)
+            {
+                if (diff != null)
+                    sorted.add(diff);
+            }
+            for (DiffFile file : this.files)
+            {
+                if (file != null)
+                    sorted.addAll(file);
+            }
+            Invariants.require(sorted.size() == size);
+            sorted.sort(Comparator.comparingInt(d -> d.rowId));
+            if (!copy)
+                this.sorted = sorted;
+            return sorted;
+        }
+
+        void removeAll(Diffs diffs)
+        {
+            files.removeAll(diffs.files);
+            flushed.removeAll(diffs.flushed);
+            size -= diffs.size;
+            sorted = null;
+        }
+
+        boolean isEmpty()
+        {
+            return size == 0;
+        }
+    }
+
     static int counter = 0;
     @Override
     public void purge(CommandStores commandStores, EpochSupplier minEpoch)
     {
         truncateTopologiesForTesting(minEpoch.epoch());
-        boolean isPartialCompaction = random.nextBoolean();
-        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
+        boolean isPartialCompaction = random.decide(0.9f);
+        for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> e : 
diffsPerCommandStore.entrySet())
         {
             int commandStoreId = e.getKey();
-            Map<TxnId, List<Diff>> localJournal = e.getValue();
+            Map<TxnId, Diffs> localJournal = e.getValue();
             CommandStore store = commandStores.forId(commandStoreId);
             if (store == null)
                 continue;
 
-            for (Map.Entry<TxnId, List<Diff>> e2 : localJournal.entrySet())
+            for (Map.Entry<TxnId, Diffs> e2 : localJournal.entrySet())
             {
-                List<Diff> diffs = e2.getValue();
-
+                Diffs diffs = e2.getValue();
                 if (diffs.isEmpty()) continue;
-                List<Diff> subset = diffs;
-                if (diffs.size() > 1 && isPartialCompaction)
+
+                Diffs subset = diffs;
                 {
-                    int removeCount = 1 + random.nextInt(diffs.size() - 1);
-                    int count = diffs.size();
-                    subset = new ArrayList<>(diffs);
-                    while (removeCount-- > 0)
+                    int filesAndFlushed = subset.flushed.size() + 
subset.files.size();
+                    if (filesAndFlushed > 1 && isPartialCompaction)
                     {
-                        int removeIndex = random.nextInt(diffs.size());
-                        if (subset.get(removeIndex) == null)
+                        int removeCount = 1 + random.nextInt(filesAndFlushed - 
1);
+                        int count = filesAndFlushed;
+                        subset = new Diffs(true, new ArrayList<>(diffs.files), 
new ArrayList<>(diffs.flushed));
+                        List<DiffFile> files = subset.files;
+                        List<Diff> flushed = subset.flushed;
+                        while (removeCount-- > 0)
+                        {
+                            int removeIndex = random.nextInt(filesAndFlushed);
+                            if (removeIndex < flushed.size())
+                            {
+                                if (flushed.get(removeIndex) == null)
+                                    continue;
+                                --subset.size;
+                                flushed.set(removeIndex, null);
+                            }
+                            else
+                            {
+                                removeIndex -= flushed.size();
+                                if (files.get(removeIndex) == null)
+                                    continue;
+                                subset.size -= files.get(removeIndex).size();
+                                files.set(removeIndex, null);
+                            }
+                            --count;
+                        }
+
+                        if (count == 0)
                             continue;
-                        subset.set(removeIndex, null);
-                        --count;
                     }
-
-                    if (count == 0)
-                        continue;
                 }
 
-                Builder[] builders = new Builder[diffs.size()];
-                for (int i = 0 ; i < subset.size() ; ++i)
+                Builder[] builders = new Builder[subset.size];
+                List<Diff> sorted = subset.sorted(true);
+                for (int i = 0 ; i < sorted.size() ; ++i)
                 {
-                    if (subset.get(i) == null) continue;
+                    if (sorted.get(i) == null) continue;
                     Builder builder = new Builder(e2.getKey(), ALL);
-                    builder.apply(subset.get(i));
+                    builder.apply(sorted.get(i));
                     builders[i] = builder;
                 }
 
@@ -388,7 +528,8 @@ public class InMemoryJournal implements Journal
                 {
                     if (cleanup == EXPUNGE)
                     {
-                        if (input == FULL || subset == diffs) e2.setValue(new 
PurgedList());
+                        if (input == FULL) e2.setValue(new Diffs(new 
PurgedList()));
+                        else if (subset == diffs) e2.setValue(new Diffs());
                         else diffs.removeAll(subset);
                         continue;
                     }
@@ -411,20 +552,39 @@ public class InMemoryJournal implements Journal
                         else
                         {
                             Diff diff = builder.toDiff();
-                            e2.setValue(cleanup == ERASE ? new 
ErasedList(diff) : new TruncatedList(diff));
+                            e2.setValue(cleanup == ERASE ? new Diffs(new 
ErasedList(diff)) : new Diffs(new TruncatedList(diff)));
                             continue;
                         }
                     }
                 }
 
+                if (diffs.flushed instanceof FinalList)
+                    continue;
+
+                int removeCount = 0;
                 for (int i = 0 ; i < builders.length ; ++i)
                 {
                     if (builders[i] != null)
                     {
                         Diff diff = builders[i].toDiff();
-                        diffs.set(i, diff.flags == 0 ? null : diff);
+                        if (diff.flags == 0)
+                        {
+                            ++removeCount;
+                            sorted.set(i, null);
+                        }
+                        else
+                        {
+                            diff.rowId = sorted.get(i).rowId;
+                            sorted.set(i, diff);
+                        }
                     }
                 }
+
+                diffs.size -= removeCount;
+                diffs.flushed.removeAll(subset.flushed);
+                diffs.files.removeAll(subset.files);
+                diffs.files.add(new DiffFile(sorted));
+                diffs.sorted = null;
             }
         }
     }
@@ -432,18 +592,18 @@ public class InMemoryJournal implements Journal
     @Override
     public void replay(CommandStores commandStores)
     {
-        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : 
diffsPerCommandStore.entrySet())
+        for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> diffEntry : 
diffsPerCommandStore.entrySet())
         {
             int commandStoreId = diffEntry.getKey();
 
             // copy to avoid concurrent modification when appending to journal
-            Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
+            Map<TxnId, List<Diff>> diffs = new TreeMap<>();
 
             InMemoryCommandStore commandStore = (InMemoryCommandStore) 
commandStores.forId(commandStoreId);
             Loader loader = commandStore.loader();
 
-            for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
-                e.setValue(new ArrayList<>(e.getValue()));
+            for (Map.Entry<TxnId, Diffs> e : diffEntry.getValue().entrySet())
+                diffs.put(e.getKey(), e.getValue().sorted(true));
 
             for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
             {
@@ -455,7 +615,20 @@ public class InMemoryJournal implements Journal
         }
     }
 
-    private static class ErasedList extends AbstractList<Diff>
+    static class TruncatedList extends ArrayList<Diff>
+    {
+        TruncatedList(Diff truncated)
+        {
+            add(truncated);
+        }
+    }
+
+    private static abstract class FinalList extends AbstractList<Diff>
+    {
+
+    }
+
+    private static class ErasedList extends FinalList
     {
         private Diff erased;
 
@@ -500,15 +673,7 @@ public class InMemoryJournal implements Journal
         }
     }
 
-    static class TruncatedList extends ArrayList<Diff>
-    {
-        TruncatedList(Diff truncated)
-        {
-            add(truncated);
-        }
-    }
-
-    private static class PurgedList extends AbstractList<Diff>
+    private static class PurgedList extends FinalList
     {
         @Override
         public Diff get(int index)
@@ -558,6 +723,7 @@ public class InMemoryJournal implements Journal
         public final TxnId txnId;
         public final Map<Field, Object> changes;
         public final int flags;
+        private int rowId;
 
         private Diff(TxnId txnId, int flags, Map<Field, Object> changes)
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 3df406ec..32a94e5c 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -76,9 +76,9 @@ public class LoggingJournal implements Journal
 
     @Override
 
-    public Journal start(Node node)
+    public void start(Node node)
     {
-        return this;
+        delegate.start(node);
     }
 
     @Override
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index 53b78354..0f2452bd 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -66,7 +66,7 @@ public class ListFetchCoordinator extends 
AbstractFetchCoordinator
         ListData listData = (ListData) data;
         persisting.add(commandStore.build(PreLoadContext.empty(), safeStore -> 
{
             listData.forEach((key, value) -> listStore.writeUnsafe(key, 
value));
-        }).flatMap(ignore -> listStore.snapshot(true, received, 
syncPoint.syncId)).addCallback((success, fail) -> {
+        }).flatMap(ignore -> listStore.snapshot(true, received, 
syncPoint.syncId)).invoke((success, fail) -> {
             if (fail == null) success(from, received);
             else fail(from, received, fail);
         }).beginAsResult());
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java 
b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index ddc06379..dc95693d 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -300,7 +300,7 @@ public class ListRequest implements Request
         txn = gen.apply(node);
         id = txnIdGen.apply(node, txn);
         listener.onClientAction(MessageListener.ClientAction.SUBMIT, 
node.id(), id, txn);
-        node.coordinate(id, txn).addCallback(new ResultCallback(node, client, 
replyContext, listener, id, txn));
+        node.coordinate(id, txn).invoke(new ResultCallback(node, client, 
replyContext, listener, id, txn));
     }
 
     @Override
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index f10b9e7f..eab240a7 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -101,7 +101,7 @@ class CommandsTest
                     for (Node n : nodeMap.values())
                         ((TestableConfigurationService) 
n.configService()).reportTopology(updatedTopology);
 
-                    node.coordinate(txnId, txn).addCallback((success, failure) 
-> {
+                    node.coordinate(txnId, txn).invoke((success, failure) -> {
                         if (failure == null)
                         {
                             node.agent().onUncaughtException(new 
AssertionError("Expected TopologyMismatch exception, but txn was success"));
diff --git a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java 
b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
index 8f429a3e..f7893e5a 100644
--- a/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
+++ b/accord-core/src/test/java/accord/utils/async/AsyncChainsTest.java
@@ -90,7 +90,7 @@ public class AsyncChainsTest
         ResultCallback<Integer> intermediateCallback = new ResultCallback<>();
 
         AsyncChain<Integer> chain = 
AsyncChains.ofCallable(MoreExecutors.directExecutor(), () -> 5);
-        chain = chain.addCallback(intermediateCallback);
+        chain = chain.invoke(intermediateCallback);
 
         chain = chain.map(i -> i + 2);
         chain.begin(finalCallback);
@@ -129,10 +129,10 @@ public class AsyncChainsTest
         AsyncChain<Integer> chain1 = AsyncChains.success(1);
         AsyncChain<Integer> chain2 = AsyncChains.success(2);
         AsyncChain<Integer> chain3 = AsyncChains.success(3);
-        AsyncChain<List<Integer>> reduced = 
AsyncChains.all(Lists.newArrayList(chain1, chain2, chain3));
+        AsyncChain<List<Integer>> reduced = 
AsyncChains.allOf(Lists.newArrayList(chain1, chain2, chain3));
         ResultCallback<List<Integer>> callback = new ResultCallback<>();
         reduced.begin(callback);
-        Assertions.assertEquals(Lists.newArrayList(1, 2, 3), callback.value());
+        Assertions.assertEquals(Arrays.asList(1, 2, 3), callback.value());
     }
 
     @Test
@@ -164,7 +164,7 @@ public class AsyncChainsTest
                 }, () -> {})
                 .map(ignore -> 1)
                 .beginAsResult()
-                .addCallback((success, failure) -> {
+                .invoke((success, failure) -> {
                     if (failure == null)
                         throw illegalState("Expected to fail");
                 });
@@ -243,7 +243,7 @@ public class AsyncChainsTest
         AtomicBoolean sawCallback = new AtomicBoolean(false);
         AsyncChains.failure(new NullPointerException("just kidding"))
                 .beginAsResult()
-                .addCallback(() -> sawCallback.set(true))
+                .invokeIfSuccess(() -> sawCallback.set(true))
                 .begin((success, failure) -> {
                     if (failure != null) sawFailure.set(true);
                     else sawFailure.set(false);
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index ffc0d3ef..8ab5ed99 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -388,7 +388,7 @@ public class Cluster implements Scheduler
 
     public static class NoOpJournal implements Journal
     {
-        @Override public Journal start(Node node) { return null; }
+        @Override public void start(Node node) { }
         @Override public Command loadCommand(int store, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new 
IllegalStateException("Not impelemented"); }
         @Override public Command.Minimal loadMinimal(int store, TxnId txnId, 
Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { 
throw new IllegalStateException("Not impelemented"); }
         @Override public void saveCommand(int store, CommandUpdate value, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
diff --git 
a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
index f7987140..01621f72 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRequest.java
@@ -51,7 +51,7 @@ public class MaelstromRequest extends Body implements Request
     @Override
     public void process(Node node, Id client, ReplyContext replyContext)
     {
-        node.coordinate(txn).addCallback((success, fail) -> {
+        node.coordinate(txn).invoke((success, fail) -> {
             Reply reply = success != null ? new 
MaelstromReply(MaelstromReplyContext.messageIdFor(replyContext), 
(MaelstromResult) success) : null;
             node.reply(client, replyContext, reply, fail);
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to