This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 5e115ae37d390622c1633e059e69b404899ea955
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Jun 10 17:25:31 2025 +0100

    Fix:
     - Do not query local topology when deciding what keys to fetch to avoid 
TopologyRetiredException
     - Ensure we propagate information back to the requesting CommandStore by 
using the store id to ensure it is included
     - BurnTest topology fetching was broken by earlier patch
     - Topology callbacks were not being invoked as we were not calling .begin()
     - Topology mismatch failure during notAccept phase was not being reported 
due to CoordinatePreAccept already having isDone==true
    
    patch by Benedict; reviewed by David Capwell for CASSANDRA-20711
---
 .../main/java/accord/coordinate/CheckShards.java   |  16 +-
 .../accord/coordinate/CoordinatePreAccept.java     |   2 +-
 .../src/main/java/accord/coordinate/FetchData.java | 224 ++-------------
 .../java/accord/coordinate/FetchSomeRoute.java     | 151 ++++++++++
 .../src/main/java/accord/coordinate/FindRoute.java |  81 ------
 .../main/java/accord/coordinate/FindSomeRoute.java |  85 ------
 .../src/main/java/accord/coordinate/Infer.java     |  29 +-
 .../main/java/accord/coordinate/Invalidate.java    |  23 +-
 .../main/java/accord/coordinate/MaybeRecover.java  |  23 +-
 .../src/main/java/accord/coordinate/Recover.java   |  39 ++-
 .../java/accord/coordinate/RecoverWithRoute.java   |  69 +++--
 .../accord/impl/AbstractConfigurationService.java  |  22 +-
 .../java/accord/impl/progresslog/HomeState.java    |  10 +-
 .../java/accord/impl/progresslog/WaitingState.java |  46 +--
 .../src/main/java/accord/local/CommandStores.java  | 314 +++++++++++++++------
 accord-core/src/main/java/accord/local/Node.java   |  15 +-
 .../main/java/accord/local/StoreParticipants.java  |   4 +-
 .../accord/local/durability/DurabilityQueue.java   |   2 +-
 .../src/main/java/accord/messages/CheckStatus.java |   8 +-
 .../src/main/java/accord/messages/Propagate.java   |  42 ++-
 .../main/java/accord/utils/async/AsyncResult.java  |   2 +
 .../accord/burn/BurnTestConfigurationService.java  |  10 +-
 .../impl/AbstractConfigurationServiceTest.java     |  11 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  10 +-
 24 files changed, 607 insertions(+), 631 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java 
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 888e3305..6dea99dd 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -39,7 +39,7 @@ import static accord.utils.Invariants.illegalState;
  */
 public abstract class CheckShards<U extends Participants<?>> extends 
ReadCoordinator<CheckStatusReply>
 {
-    final U route;
+    final U query;
 
     /**
      * The epoch we want to fetch data from remotely
@@ -54,17 +54,17 @@ public abstract class CheckShards<U extends 
Participants<?>> extends ReadCoordin
     protected boolean truncated;
 
     // srcEpoch is either txnId.epoch() or executeAt.epoch()
-    protected CheckShards(Node node, TxnId txnId, U route, IncludeInfo 
includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf 
previouslyKnownToBeInvalidIf)
+    protected CheckShards(Node node, TxnId txnId, U query, IncludeInfo 
includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf 
previouslyKnownToBeInvalidIf)
     {
-        this(node, txnId, route, txnId.epoch(), includeInfo, bumpBallot, 
previouslyKnownToBeInvalidIf);
+        this(node, txnId, query, txnId.epoch(), includeInfo, bumpBallot, 
previouslyKnownToBeInvalidIf);
         Invariants.require(txnId.isVisible());
     }
 
-    protected CheckShards(Node node, TxnId txnId, U route, long srcEpoch, 
IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf 
previouslyKnownToBeInvalidIf)
+    protected CheckShards(Node node, TxnId txnId, U query, long srcEpoch, 
IncludeInfo includeInfo, @Nullable Ballot bumpBallot, Infer.InvalidIf 
previouslyKnownToBeInvalidIf)
     {
-        super(node, topologyFor(node, txnId, route, srcEpoch), txnId);
+        super(node, topologyFor(node, txnId, query, srcEpoch), txnId);
         this.sourceEpoch = srcEpoch;
-        this.route = route;
+        this.query = query;
         this.includeInfo = includeInfo;
         this.bumpBallot = bumpBallot;
         this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf;
@@ -79,7 +79,7 @@ public abstract class CheckShards<U extends Participants<?>> 
extends ReadCoordin
     @Override
     protected void contact(Id id)
     {
-        Participants<?> unseekables = 
route.slice(topologies().computeRangesForNode(id));
+        Participants<?> unseekables = 
query.slice(topologies().computeRangesForNode(id));
         node.send(id, new CheckStatus(txnId, unseekables, sourceEpoch, 
includeInfo, bumpBallot), this);
     }
 
@@ -119,7 +119,7 @@ public abstract class CheckShards<U extends 
Participants<?>> extends ReadCoordin
     @Override
     protected void finishOnExhaustion()
     {
-        if (merged != null && merged.map.hasFullyTruncated(route)) 
finishOnFailure(new Truncated(txnId, null), false);
+        if (merged != null && merged.map.hasFullyTruncated(query)) 
finishOnFailure(new Truncated(txnId, null), false);
         else super.finishOnExhaustion();
     }
 }
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java 
b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
index 5649a402..40c17630 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinatePreAccept.java
@@ -142,7 +142,7 @@ abstract class CoordinatePreAccept<T> extends 
AbstractCoordinatePreAccept<T, Pre
         proposeInvalidate(node, node.uniqueTimestamp(Ballot::fromValues), 
txnId, route.homeKey(), (outcome, failure) -> {
             if (failure != null)
                 mismatch.addSuppressed(failure);
-            setFailure(mismatch);
+            callback.accept(null, mismatch);
         });
     }
 
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java 
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index b7a7c0ef..38b5c878 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -21,30 +21,24 @@ import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 
 import accord.coordinate.Infer.InvalidIf;
+import accord.local.CommandStores.LatentStoreSelector;
+import accord.local.CommandStores.StoreSelector;
 import accord.local.Node;
-import accord.primitives.Status;
 import accord.primitives.Known;
 import accord.messages.CheckStatus;
 import accord.messages.CheckStatus.CheckStatusOkFull;
 import accord.messages.Propagate;
-import accord.primitives.FullRoute;
 import accord.primitives.Participants;
 import accord.primitives.Ranges;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
-import accord.topology.TopologyManager;
-import accord.topology.TopologyManager.TopologyRetiredException;
 import accord.utils.Invariants;
 
 import javax.annotation.Nonnull;
 
 import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
-import static 
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
-import static accord.primitives.Route.castToRoute;
-import static accord.primitives.Route.isFullRoute;
-import static accord.primitives.Route.isRoute;
 
 /**
  * Find data and persist locally
@@ -75,16 +69,13 @@ public class FetchData extends CheckShards<Route<?>>
         final InvalidIf invalidIf;
         final @Nullable Timestamp executeAt;
         final long srcEpoch;
-        final Participants<?> fetchKeys;
-        final long lowEpoch, highEpoch;
+        // known participants, a subset of which we may fetch from
+        final Participants<?> contactable;
+        final Participants<?> requestedFor;
+        final StoreSelector reportTo;
         final BiConsumer<? super FetchResult, Throwable> callback;
 
-        public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, 
@Nullable Timestamp executeAt, Participants<?> fetchKeys, BiConsumer<? super 
FetchResult, Throwable> callback)
-        {
-            this(fetch, txnId, invalidIf, executeAt, fetchKeys, 
Long.MIN_VALUE, Long.MIN_VALUE, callback);
-        }
-
-        public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, 
@Nullable Timestamp executeAt, Participants<?> fetchKeys, long lowEpoch, long 
highEpoch, BiConsumer<? super FetchResult, Throwable> callback)
+        public FetchRequest(Known fetch, TxnId txnId, InvalidIf invalidIf, 
@Nullable Timestamp executeAt, Participants<?> contactable, Participants<?> 
requestedFor, StoreSelector reportTo, BiConsumer<? super FetchResult, 
Throwable> callback)
         {
             this.fetch = fetch;
             this.invalidIf = invalidIf;
@@ -92,88 +83,32 @@ public class FetchData extends CheckShards<Route<?>>
             this.executeAt = executeAt;
             this.callback = callback;
             this.srcEpoch = fetch.fetchEpoch(txnId, executeAt);
-            this.fetchKeys = fetchKeys;
-            this.lowEpoch = lowEpoch == Long.MIN_VALUE ? txnId.epoch() : 
lowEpoch;
-            this.highEpoch = highEpoch == Long.MIN_VALUE ? srcEpoch : 
highEpoch;
-        }
-
-        Ranges localRanges(Node node)
-        {
-            try
-            {
-                return node.topology().localRangesForEpochs(lowEpoch, 
highEpoch);
-            }
-            catch (TopologyRetiredException t)
-            {
-                throw new TopologyRetiredException("Failed to read local 
ranges for " + txnId + " between " + lowEpoch + " and " + highEpoch, t);
-            }
-        }
-    }
-
-    public static void fetch(Known fetch, Node node, TxnId txnId, @Nullable 
Timestamp executeAt, Participants<?> someKeys, long localLowEpoch, long 
localHighEpoch, BiConsumer<? super FetchResult, Throwable> callback)
-    {
-        fetch(fetch, node, txnId, NotKnownToBeInvalid, executeAt, someKeys, 
localLowEpoch, localHighEpoch, callback);
-    }
-
-    public static void fetch(Known fetch, Node node, TxnId txnId, InvalidIf 
invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, long 
localLowEpoch, long localHighEpoch, BiConsumer<? super FetchResult, Throwable> 
callback)
-    {
-        fetch(node, new FetchRequest(fetch, txnId, invalidIf, executeAt, 
fetchKeys, localLowEpoch, localHighEpoch, callback));
-    }
-
-    public static void fetch(Known fetch, Node node, TxnId txnId, InvalidIf 
invalidIf, @Nullable Timestamp executeAt, Participants<?> fetchKeys, 
BiConsumer<? super FetchResult, Throwable> callback)
-    {
-        fetch(node, new FetchRequest(fetch, txnId, invalidIf, executeAt, 
fetchKeys, callback));
-    }
-
-    public static void fetch(Node node, FetchRequest request)
-    {
-        Participants<?> fetchKeys = request.fetchKeys;
-        if (fetchKeys.kind().isRoute()) fetch(node, castToRoute(fetchKeys), 
request);
-        else fetchViaSomeRoute(node, fetchKeys, request);
-    }
-
-    public static void fetch(Node node, Route<?> route, FetchRequest request)
-    {
-        long srcEpoch = request.srcEpoch;
-        if (!node.topology().hasEpoch(srcEpoch))
-        {
-            node.withEpochAtLeast(srcEpoch, request.callback, () -> 
fetch(node, route, request));
-            return;
-        }
-
-        Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), 
"Unknown epoch %d, latest known is %d", srcEpoch, node.epoch());
-        Ranges ranges = request.localRanges(node);
-        if (!Route.isFullRoute(route))
-        {
-            fetchWithIncompleteRoute(node, route, request);
-        }
-        else
-        {
-            Route<?> slicedRoute = route.slice(ranges);
-            fetchInternal(node, ranges, slicedRoute, request);
+            this.contactable = contactable;
+            this.requestedFor = requestedFor;
+            this.reportTo = reportTo;
         }
     }
 
     /**
      * Do not make an attempt to discern what keys need to be contacted; fetch 
from only the specific remote keys that were requested.
      */
-    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, 
Participants<?> localKeys, long lowEpoch, long highEpoch, BiConsumer<? super 
FetchResult, Throwable> callback)
+    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, 
Participants<?> requestedFor, StoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback)
     {
-        fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, 
query, maxRoute, localKeys, lowEpoch, highEpoch, callback);
+        fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, 
query, maxRoute, requestedFor, reportTo, callback);
     }
 
     /**
      * Do not make an attempt to discern what keys need to be contacted; fetch 
from only the specific remote keys that were requested.
      */
-    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> 
maxRoute, Participants<?> localKeys, long lowEpoch, long highEpoch, 
BiConsumer<? super FetchResult, Throwable> callback)
+    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> 
maxRoute, Participants<?> requestedFor, StoreSelector reportTo, BiConsumer<? 
super FetchResult, Throwable> callback)
     {
-        fetchSpecific(node, query, maxRoute, new FetchRequest(fetch, txnId, 
invalidIf, executeAt, localKeys, lowEpoch, highEpoch, callback));
+        fetchSpecific(node, query, maxRoute, new FetchRequest(fetch, txnId, 
invalidIf, executeAt, maxRoute, requestedFor, reportTo, callback));
     }
 
     public static void fetchSpecific(Node node, Route<?> query, Route<?> 
maxRoute, FetchRequest request)
     {
         long srcEpoch = request.srcEpoch;
-        if (!node.topology().hasEpoch(srcEpoch))
+        if (!node.topology().hasAtLeastEpoch(srcEpoch))
         {
             node.withEpochAtLeast(srcEpoch, request.callback, () -> 
fetchSpecific(node, query, maxRoute, request));
             return;
@@ -182,166 +117,67 @@ public class FetchData extends CheckShards<Route<?>>
         fetchData(node, query, maxRoute, request);
     }
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    private static void fetchViaSomeRoute(Node node, Participants<?> 
someUnseekables, FetchRequest request)
-    {
-        FindSomeRoute.findSomeRoute(node, request.txnId, request.invalidIf, 
someUnseekables, (foundRoute, fail) -> {
-            if (fail != null) request.callback.accept(null, fail);
-            else if (foundRoute.route == null)
-            {
-                reportRouteNotFound(node, foundRoute.known, request);
-            }
-            else if (isFullRoute(foundRoute.route))
-            {
-                fetch(node, Route.castToFullRoute(foundRoute.route), request);
-            }
-            else if (isRoute(someUnseekables) && 
someUnseekables.containsAll(foundRoute.route))
-            {
-                // this is essentially a reentrancy check; we can only reach 
this point if we have already tried once to fetchSomeRoute
-                // (as a user-provided Route is used to fetchRoute, not 
fetchSomeRoute)
-                reportRouteNotFound(node, foundRoute.known, request);
-            }
-            else
-            {
-                Route<?> route = foundRoute.route;
-                if (isRoute(someUnseekables))
-                    route = Route.merge(route, (Route)someUnseekables);
-                fetch(node, route, request);
-            }
-        });
-    }
-
-    private static void reportRouteNotFound(Node node, Known found, 
FetchRequest req)
-    {
-        Invariants.require(req.executeAt == null);
-        TxnId txnId = req.txnId;
-        switch (found.outcome())
-        {
-            default: throw new AssertionError("Unknown outcome: " + 
found.outcome());
-            case Abort:
-                locallyInvalidateAndCallback(node, txnId, req.lowEpoch, 
req.highEpoch, req.fetchKeys, new FetchResult(found, req.fetchKeys, null), 
req.callback);
-                break;
-
-            case Unknown:
-                if (found.canProposeInvalidation())
-                {
-                    Invalidate.invalidate(node, txnId, req.fetchKeys, false, 
req.lowEpoch, req.highEpoch, (outcome, throwable) -> {
-                        FetchResult result = null;
-                        if (throwable == null)
-                        {
-                            Known achieved = Known.Invalidated;
-                            Unseekables<?> achievedTarget = req.fetchKeys, 
didNotAchieveTarget = null;
-                            if (outcome.asProgressToken().status != 
Status.Invalidated)
-                            {
-                                achievedTarget = req.fetchKeys.slice(0, 0);
-                                didNotAchieveTarget = req.fetchKeys;
-                                achieved = Known.Nothing;
-                            }
-                            result = new FetchResult(achieved, achievedTarget, 
didNotAchieveTarget);
-                        }
-                        req.callback.accept(result, throwable);
-                    });
-                    break;
-                }
-            case Erased:
-            case WasApply:
-            case Apply:
-                // TODO (required): we may be stale
-                req.callback.accept(new FetchResult(found, 
req.fetchKeys.slice(0, 0), req.fetchKeys), null);
-        }
-    }
-
-    private static void fetchWithIncompleteRoute(Node node, Route<?> 
someRoute, FetchRequest request)
-    {
-        long srcEpoch = request.srcEpoch;
-        Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), 
"Unknown epoch %d, latest known is %d", srcEpoch, node.epoch());
-        FindRoute.findRoute(node, request.txnId, request.invalidIf, 
someRoute.withHomeKey(), (foundRoute, fail) -> {
-            if (fail != null) request.callback.accept(null, fail);
-            else if (foundRoute == null) fetchViaSomeRoute(node, someRoute, 
request);
-            else fetch(node, foundRoute.route, request);
-        });
-    }
-
-    public static void fetch(Node node, FullRoute<?> route, FetchRequest 
request)
-    {
-        node.withEpochAtLeast(request.srcEpoch, request.callback, () -> {
-            Ranges ranges = request.localRanges(node);
-            fetchInternal(node, ranges, route, request);
-        });
-    }
-
-    private static Object fetchInternal(Node node, Ranges ranges, Route<?> 
route, FetchRequest request)
-    {
-        long srcEpoch = request.srcEpoch;
-        Invariants.requireArgument(node.topology().hasEpoch(srcEpoch), 
"Unknown epoch %d, latest known is %d", srcEpoch, node.epoch());
-        Route<?> slicedRoute = route.slice(ranges);
-        return fetchData(node, slicedRoute, route, request);
-    }
-
     final BiConsumer<? super FetchResult, Throwable> callback;
     /**
      * The epoch until which we want to persist any response for locally
      */
     final Known target;
     final Route<?> maxRoute;
+    final Participants<?> requestedFor;
 
     // to support cases where a later epoch that ultimately does not 
participate in execution has a vestigial entry
     // (i.e. if preaccept/accept contact a later epoch than execution is 
decided for)
-    final long lowEpoch, highEpoch;
-
-    final Unseekables<?> propagateTo;
+    final LatentStoreSelector reportTo;
 
-    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> maxRoute, Unseekables<?> propagateTo, long 
sourceEpoch, long lowEpoch, long highEpoch, BiConsumer<? super FetchResult, 
Throwable> callback)
+    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> maxRoute, Participants<?> requestedFor, 
long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, 
Throwable> callback)
     {
-        this(node, target, txnId, invalidIf, route, route.withHomeKey(), 
maxRoute, propagateTo, sourceEpoch, lowEpoch, highEpoch, callback);
+        this(node, target, txnId, invalidIf, route, route.withHomeKey(), 
maxRoute, requestedFor, sourceEpoch, reportTo, callback);
     }
 
-    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, 
Unseekables<?> propagateTo, long sourceEpoch, long lowEpoch, long highEpoch, 
BiConsumer<? super FetchResult, Throwable> callback)
+    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, 
Participants<?> requestedFor, long sourceEpoch, StoreSelector reportTo, 
BiConsumer<? super FetchResult, Throwable> callback)
     {
         // TODO (desired, efficiency): restore behaviour of only collecting 
info if e.g. Committed or Executed
         super(node, txnId, routeWithHomeKey, sourceEpoch, 
CheckStatus.IncludeInfo.All, null, invalidIf);
-        this.propagateTo = propagateTo;
-        this.lowEpoch = lowEpoch;
+        this.reportTo = reportTo;
         this.maxRoute = maxRoute;
+        this.requestedFor = requestedFor;
         Invariants.requireArgument(routeWithHomeKey.contains(route.homeKey()), 
"route %s does not contain %s", routeWithHomeKey, route.homeKey());
         this.target = target;
-        this.highEpoch = highEpoch;
         this.callback = callback;
     }
 
     private static FetchData fetchData(Node node, Route<?> route, Route<?> 
maxRoute, FetchRequest req)
     {
-        Invariants.require(!req.fetchKeys.isEmpty());
-        FetchData fetch = new FetchData(node, req.fetch, req.txnId, 
req.invalidIf, route, maxRoute, req.fetchKeys, req.srcEpoch, req.lowEpoch, 
req.highEpoch, req.callback);
+        Invariants.require(!req.contactable.isEmpty());
+        FetchData fetch = new FetchData(node, req.fetch, req.txnId, 
req.invalidIf, route, maxRoute, req.requestedFor, req.srcEpoch, req.reportTo, 
req.callback);
         fetch.start();
         return fetch;
     }
 
-    private static FetchData fetchData(Node node, Known fetch, TxnId txnId, 
InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Unseekables<?> 
localKeys, long sourceEpoch, long lowEpoch, long highEpoch, BiConsumer<? super 
FetchResult, Throwable> callback)
+    private static FetchData fetchData(Node node, Known fetch, TxnId txnId, 
InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, Participants<?> 
requestedFor, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback)
     {
-        Invariants.require(!localKeys.isEmpty());
-        FetchData fetchData = new FetchData(node, fetch, txnId, invalidIf, 
route, maxRoute, localKeys, sourceEpoch, lowEpoch, highEpoch, callback);
+        FetchData fetchData = new FetchData(node, fetch, txnId, invalidIf, 
route, maxRoute, requestedFor, sourceEpoch, reportTo, callback);
         fetchData.start();
         return fetchData;
     }
 
     protected Route<?> query()
     {
-        return route;
+        return query;
     }
 
     @Override
     protected boolean isSufficient(Node.Id from, CheckStatus.CheckStatusOk ok)
     {
         Ranges rangesForNode = topologies().computeRangesForNode(from);
-        Route<?> scope = this.route.slice(rangesForNode);
+        Route<?> scope = this.query.slice(rangesForNode);
         return isSufficient(scope, ok);
     }
 
     @Override
     protected boolean isSufficient(CheckStatus.CheckStatusOk ok)
     {
-        return isSufficient(route, ok);
+        return isSufficient(query, ok);
     }
 
     protected boolean isSufficient(Route<?> scope, CheckStatus.CheckStatusOk 
ok)
@@ -363,7 +199,7 @@ public class FetchData extends CheckShards<Route<?>>
                 Invariants.require(isSufficient(merged), "Status %s is not 
sufficient", merged);
 
             // TODO (expected): should we automatically trigger a new fetch if 
we find executeAt but did not request enough information? would be more robust
-            Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, lowEpoch, highEpoch, success.withQuorum, query(), propagateTo, 
target, (CheckStatusOkFull) merged, callback);
+            Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query(), requestedFor, reportTo, target, 
(CheckStatusOkFull) merged, callback);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java 
b/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java
new file mode 100644
index 00000000..a1691f92
--- /dev/null
+++ b/accord-core/src/main/java/accord/coordinate/FetchSomeRoute.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.coordinate;
+
+import java.util.function.BiConsumer;
+
+import accord.local.Commands;
+import accord.local.Node;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.Known;
+import accord.messages.CheckStatus.CheckStatusOk;
+import accord.messages.CheckStatus.IncludeInfo;
+import accord.primitives.WithQuorum;
+import accord.primitives.Participants;
+import accord.primitives.Route;
+import accord.primitives.TxnId;
+import accord.utils.MapReduceConsume;
+
+import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
+import static 
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
+import static accord.local.CommandStores.*;
+import static accord.primitives.Known.Nothing;
+import static accord.primitives.WithQuorum.HasQuorum;
+
+/**
+ * Find some Route for a txnId using some known participants
+ */
+public class FetchSomeRoute extends CheckShards<Participants<?>>
+{
+    final LatentStoreSelector reportTo;
+    final BiConsumer<Route<?>, Throwable> callback;
+    FetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, 
Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, 
Throwable> callback)
+    {
+        super(node, txnId, contactable, IncludeInfo.Route, null, invalidIf);
+        this.reportTo = reportTo;
+        this.callback = callback;
+    }
+
+    public static void fetchSomeRoute(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, 
BiConsumer<Route<?>, Throwable> callback)
+    {
+        if (!node.topology().hasEpoch(txnId.epoch()))
+        {
+            node.withEpochAtLeast(txnId.epoch(), callback, () -> 
fetchSomeRoute(node, txnId, invalidIf, unseekables, reportTo, callback));
+            return;
+        }
+
+        FetchSomeRoute fetchSomeRoute = new FetchSomeRoute(node, txnId, 
invalidIf, unseekables, reportTo, callback);
+        fetchSomeRoute.start();
+    }
+
+
+    public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> 
contactable, BiConsumer<Route<?>, Throwable> callback)
+    {
+        fetchSomeRoute(node, txnId, contactable, 
LatentStoreSelector.standard(), callback);
+    }
+
+    public static void fetchSomeRoute(Node node, TxnId txnId, Participants<?> 
contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> 
callback)
+    {
+        fetchSomeRoute(node, txnId, NotKnownToBeInvalid, contactable, 
reportTo, callback);
+    }
+
+    @Override
+    protected boolean isSufficient(CheckStatusOk ok)
+    {
+        return ok.route != null;
+    }
+
+    @Override
+    protected void onDone(Success success, Throwable failure)
+    {
+        if (failure != null) callback.accept(null, failure);
+        else
+        {
+            final Route<?> route = merged == null ? null : merged.route;
+            if (route == null)
+            {
+                Known known = Nothing;
+                if (merged != null)
+                    known = merged.finish(query, query, query, 
success.withQuorum, previouslyKnownToBeInvalidIf).knownFor(txnId, query, query);
+                reportRouteNotFound(node, success.withQuorum, known, txnId, 
query, reportTo, callback);
+            }
+            else
+            {
+                StoreSelector selector = reportTo.refine(txnId, null, query);
+                node.mapReduceConsumeLocal(txnId, selector, new 
MapReduceConsume<>()
+                {
+                    @Override
+                    public void accept(Object result, Throwable failure)
+                    {
+                        callback.accept(route, null);
+                    }
+
+                    @Override
+                    public Object apply(SafeCommandStore safeStore)
+                    {
+                        SafeCommand safeCommand = 
safeStore.ifInitialised(txnId);
+                        if (safeCommand != null)
+                            Commands.updateRoute(safeStore, safeCommand, 
route);
+                        return null;
+                    }
+
+                    @Override
+                    public Object reduce(Object o1, Object o2)
+                    {
+                        return null;
+                    }
+                });
+            }
+        }
+    }
+
+    private static void reportRouteNotFound(Node node, WithQuorum withQuorum, 
Known found, TxnId txnId, Participants<?> participants, LatentStoreSelector 
reportTo, BiConsumer<Route<?>, Throwable> callback)
+    {
+        switch (found.outcome())
+        {
+            default: throw new AssertionError("Unknown outcome: " + 
found.outcome());
+            case Abort:
+                locallyInvalidateAndCallback(node, txnId, 
reportTo.refine(txnId, null, participants), participants, null, callback);
+                break;
+
+            case Unknown:
+                if (withQuorum == HasQuorum && found.canProposeInvalidation())
+                {
+                    Invalidate.invalidate(node, txnId, participants, false, 
reportTo, (outcome, throwable) -> callback.accept(null, throwable));
+                    break;
+                }
+            case Erased:
+            case WasApply:
+            case Apply:
+                callback.accept(null, null);
+        }
+    }
+
+}
diff --git a/accord-core/src/main/java/accord/coordinate/FindRoute.java 
b/accord-core/src/main/java/accord/coordinate/FindRoute.java
deleted file mode 100644
index a5a1046e..00000000
--- a/accord-core/src/main/java/accord/coordinate/FindRoute.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate;
-
-import java.util.function.BiConsumer;
-
-import accord.coordinate.Infer.InvalidIf;
-import accord.local.Node;
-import accord.messages.CheckStatus.CheckStatusOk;
-import accord.messages.CheckStatus.IncludeInfo;
-import accord.primitives.*;
-
-import static accord.primitives.Route.isFullRoute;
-
-/**
- * Find the Route of a known (txnId, homeKey) pair
- */
-public class FindRoute extends CheckShards<Route<?>>
-{
-    public static class Result
-    {
-        public final FullRoute<?> route;
-        public final Timestamp executeAt;
-
-        public Result(FullRoute<?> route, Timestamp executeAt)
-        {
-            this.route = route;
-            this.executeAt = executeAt;
-        }
-
-        public Result(CheckStatusOk ok)
-        {
-            this.route = Route.castToFullRoute(ok.route);
-            this.executeAt = ok.maxKnown().isExecuteAtKnown() ? ok.executeAt : 
null;
-        }
-    }
-
-    final BiConsumer<Result, Throwable> callback;
-    FindRoute(Node node, TxnId txnId, InvalidIf invalidIf, Route<?> someRoute, 
BiConsumer<Result, Throwable> callback)
-    {
-        super(node, txnId, someRoute, IncludeInfo.Route, null, invalidIf);
-        this.callback = callback;
-    }
-
-    public static FindRoute findRoute(Node node, TxnId txnId, InvalidIf 
invalidIf, Route<?> someRoute, BiConsumer<Result, Throwable> callback)
-    {
-        FindRoute findRoute = new FindRoute(node, txnId, invalidIf, someRoute, 
callback);
-        findRoute.start();
-        return findRoute;
-    }
-
-    @Override
-    protected boolean isSufficient(CheckStatusOk ok)
-    {
-        return isFullRoute(ok.route);
-    }
-
-    @Override
-    protected void onDone(Success success, Throwable failure)
-    {
-        if (failure != null) callback.accept(null, failure);
-        else if (success == Success.Success) callback.accept(new 
Result(merged), null);
-        else callback.accept(null, null);
-    }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java 
b/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java
deleted file mode 100644
index f087f4be..00000000
--- a/accord-core/src/main/java/accord/coordinate/FindSomeRoute.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.coordinate;
-
-import java.util.function.BiConsumer;
-
-import accord.local.Node;
-import accord.primitives.Known;
-import accord.messages.CheckStatus.CheckStatusOk;
-import accord.messages.CheckStatus.IncludeInfo;
-import accord.primitives.WithQuorum;
-import accord.primitives.Participants;
-import accord.primitives.Route;
-import accord.primitives.TxnId;
-
-import static accord.primitives.Known.Nothing;
-
-/**
- * Find the homeKey of a txnId with some known keys
- */
-public class FindSomeRoute extends CheckShards<Participants<?>>
-{
-    static class Result
-    {
-        public final Route<?> route;
-        public final Known known;
-        public final WithQuorum withQuorum;
-
-        Result(Route<?> route, Known known, WithQuorum withQuorum)
-        {
-            this.route = route;
-            this.known = known;
-            this.withQuorum = withQuorum;
-        }
-    }
-
-    final BiConsumer<Result, Throwable> callback;
-    FindSomeRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, 
Participants<?> unseekables, BiConsumer<Result, Throwable> callback)
-    {
-        super(node, txnId, unseekables, IncludeInfo.Route, null, invalidIf);
-        this.callback = callback;
-    }
-
-    public static void findSomeRoute(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Participants<?> unseekables, BiConsumer<Result, Throwable> callback)
-    {
-        if (!node.topology().hasEpoch(txnId.epoch()))
-        {
-            node.withEpochAtLeast(txnId.epoch(), callback, () -> 
findSomeRoute(node, txnId, invalidIf, unseekables, callback));
-            return;
-        }
-
-        FindSomeRoute findSomeRoute = new FindSomeRoute(node, txnId, 
invalidIf, unseekables, callback);
-        findSomeRoute.start();
-    }
-
-    @Override
-    protected boolean isSufficient(CheckStatusOk ok)
-    {
-        return ok.homeKey != null;
-    }
-
-    @Override
-    protected void onDone(Success success, Throwable failure)
-    {
-        if (failure != null) callback.accept(null, failure);
-        else if (merged == null) callback.accept(new Result(null, Nothing, 
success.withQuorum), null);
-        else callback.accept(new Result(merged.route, 
merged.finish(this.route, this.route, this.route, success.withQuorum, 
previouslyKnownToBeInvalidIf).knownFor(txnId, this.route, this.route), 
success.withQuorum), null);
-    }
-}
diff --git a/accord-core/src/main/java/accord/coordinate/Infer.java 
b/accord-core/src/main/java/accord/coordinate/Infer.java
index cc50ffbb..e5e4c6b3 100644
--- a/accord-core/src/main/java/accord/coordinate/Infer.java
+++ b/accord-core/src/main/java/accord/coordinate/Infer.java
@@ -21,6 +21,8 @@ package accord.coordinate;
 import java.util.function.BiConsumer;
 
 import accord.local.Command;
+import accord.local.CommandStores.StoreFinder;
+import accord.local.CommandStores.StoreSelector;
 import accord.local.Commands;
 import accord.local.Node;
 import accord.local.SafeCommand;
@@ -30,13 +32,10 @@ import accord.primitives.Known;
 import accord.local.StoreParticipants;
 import accord.primitives.Participants;
 import accord.primitives.TxnId;
-import accord.primitives.Unseekables;
 import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
 
 import static accord.primitives.Status.PreCommitted;
-import static accord.primitives.Route.castToRoute;
-import static accord.primitives.Route.isRoute;
 
 // TODO (testing): dedicated randomised testing of all inferences
 public class Infer
@@ -108,17 +107,16 @@ public class Infer
         final TxnId txnId;
         // TODO (expected): more consistent handling of transactions that only 
MAY intersect a commandStore
         //  (e.g. dependencies from an earlier epoch that have not yet 
committed, or commands that are proposed to execute in a later epoch than 
eventually agreed)
-        final long lowEpoch, highEpoch;
+        final StoreSelector reportTo;
         final Participants<?> participants;
         final T param;
         final BiConsumer<T, Throwable> callback;
 
-        private CleanupAndCallback(Node node, TxnId txnId, long lowEpoch, long 
highEpoch, Participants<?> participants, T param, BiConsumer<T, Throwable> 
callback)
+        private CleanupAndCallback(Node node, TxnId txnId, StoreSelector 
reportTo, Participants<?> participants, T param, BiConsumer<T, Throwable> 
callback)
         {
             this.node = node;
             this.txnId = txnId;
-            this.lowEpoch = lowEpoch;
-            this.highEpoch = highEpoch;
+            this.reportTo = reportTo;
             this.participants = participants;
             this.param = param;
             this.callback = callback;
@@ -126,8 +124,7 @@ public class Infer
 
         void start()
         {
-            Unseekables<?> propagateTo = isRoute(participants) ? 
castToRoute(participants).withHomeKey() : participants;
-            node.mapReduceConsumeLocal(txnId, propagateTo, lowEpoch, 
highEpoch, this);
+            node.mapReduceConsumeLocal(txnId, reportTo.refine(txnId, null, 
participants), this);
         }
 
         @Override
@@ -153,17 +150,21 @@ public class Infer
         }
     }
 
-    // TODO (required, consider): low and high bounds are correct?
     static class InvalidateAndCallback<T> extends CleanupAndCallback<T>
     {
-        private InvalidateAndCallback(Node node, TxnId txnId, long lowEpoch, 
long highEpoch, Participants<?> someUnseekables, T param, BiConsumer<T, 
Throwable> callback)
+        private InvalidateAndCallback(Node node, TxnId txnId, StoreSelector 
selector, Participants<?> participants, T param, BiConsumer<T, Throwable> 
callback)
         {
-            super(node, txnId, lowEpoch, highEpoch, someUnseekables, param, 
callback);
+            super(node, txnId, selector, participants, param, callback);
         }
 
-        public static <T> void locallyInvalidateAndCallback(Node node, TxnId 
txnId, long lowEpoch, long highEpoch, Participants<?> someUnseekables, T param, 
BiConsumer<T, Throwable> callback)
+        public static <T> void locallyInvalidateAndCallback(Node node, TxnId 
txnId, long lowEpoch, long highEpoch, Participants<?> participants, T param, 
BiConsumer<T, Throwable> callback)
         {
-            new InvalidateAndCallback<>(node, txnId, lowEpoch, highEpoch, 
someUnseekables, param, callback).start();
+            new InvalidateAndCallback<>(node, txnId, 
StoreFinder.selector(participants, lowEpoch, highEpoch), participants, param, 
callback).start();
+        }
+
+        public static <T> void locallyInvalidateAndCallback(Node node, TxnId 
txnId, StoreSelector selector, Participants<?> participants, T param, 
BiConsumer<T, Throwable> callback)
+        {
+            new InvalidateAndCallback<>(node, txnId, selector, participants, 
param, callback).start();
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java 
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 3912058d..813aca1b 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -23,6 +23,8 @@ import java.util.function.BiConsumer;
 import accord.coordinate.tracking.InvalidationTracker;
 import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker;
 import accord.coordinate.tracking.RequestStatus;
+import accord.local.CommandStores.LatentStoreSelector;
+import accord.local.CommandStores.StoreSelector;
 import accord.local.Node.Id;
 import accord.messages.Commit;
 import accord.local.*;
@@ -63,9 +65,9 @@ public class Invalidate implements Callback<InvalidateReply>
     private final SortedListMap<Id, InvalidateReply> replies;
     private final InvalidationTracker tracker;
     private Throwable failure;
-    private final long reportLowEpoch, reportHighEpoch;
+    private final LatentStoreSelector reportTo;
 
-    private Invalidate(Node node, Ballot ballot, TxnId txnId, Participants<?> 
invalidateWith, boolean transitivelyInvokedByPriorInvalidation, long 
reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback)
+    private Invalidate(Node node, Ballot ballot, TxnId txnId, Participants<?> 
invalidateWith, boolean transitivelyInvokedByPriorInvalidation, 
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         this.callback = callback;
         this.node = node;
@@ -73,8 +75,7 @@ public class Invalidate implements Callback<InvalidateReply>
         this.txnId = txnId;
         this.invalidateWith = invalidateWith;
         this.transitivelyInvokedByPriorInvalidation = 
transitivelyInvokedByPriorInvalidation;
-        this.reportLowEpoch = reportLowEpoch;
-        this.reportHighEpoch = reportHighEpoch;
+        this.reportTo = reportTo;
         Topologies topologies = node.topology().forEpoch(invalidateWith, 
txnId.epoch(), SHARE);
         Invariants.require(topologies.size() == 1);
         this.tracker = new InvalidationTracker(topologies, txnId);
@@ -88,13 +89,13 @@ public class Invalidate implements Callback<InvalidateReply>
 
     public static Invalidate invalidate(Node node, TxnId txnId, 
Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, 
BiConsumer<Outcome, Throwable> callback)
     {
-        return invalidate(node, txnId, invalidateWith, 
transitivelyInvokedByPriorInvalidation, txnId.epoch(), txnId.epoch(), callback);
+        return invalidate(node, txnId, invalidateWith, 
transitivelyInvokedByPriorInvalidation, LatentStoreSelector.standard(), 
callback);
     }
 
-    public static Invalidate invalidate(Node node, TxnId txnId, 
Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, 
long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> 
callback)
+    public static Invalidate invalidate(Node node, TxnId txnId, 
Participants<?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, 
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         Ballot ballot = node.uniqueTimestamp(Ballot::fromValues);
-        Invalidate invalidate = new Invalidate(node, ballot, txnId, 
invalidateWith, transitivelyInvokedByPriorInvalidation, reportLowEpoch, 
reportHighEpoch, callback);
+        Invalidate invalidate = new Invalidate(node, ballot, txnId, 
invalidateWith, transitivelyInvokedByPriorInvalidation, reportTo, callback);
         invalidate.start();
         return invalidate;
     }
@@ -216,7 +217,7 @@ public class Invalidate implements Callback<InvalidateReply>
                         if (!invalidateWith.containsAll(fullRoute))
                             witnessedByInvalidation = null;
                     }
-                    RecoverWithRoute.recover(node, txnId, NotKnownToBeInvalid, 
fullRoute, witnessedByInvalidation, reportLowEpoch, reportHighEpoch, callback);
+                    RecoverWithRoute.recover(node, txnId, NotKnownToBeInvalid, 
fullRoute, witnessedByInvalidation, reportTo, callback);
                     return;
 
                 case Invalidated:
@@ -289,14 +290,14 @@ public class Invalidate implements 
Callback<InvalidateReply>
         //  so we do not need to explicitly do so here before notifying the 
waiter
         Participants<?> commitTo = Participants.merge(route, (Participants) 
invalidateWith);
         Commit.Invalidate.commitInvalidate(node, txnId, commitTo, txnId);
-        commitInvalidateLocal(commitTo, reportLowEpoch, reportHighEpoch);
+        commitInvalidateLocal(commitTo, reportTo.refine(txnId, null, 
commitTo));
     }
 
-    private void commitInvalidateLocal(Participants<?> commitTo, long 
lowEpoch, long highEpoch)
+    private void commitInvalidateLocal(Participants<?> commitTo, StoreSelector 
reportTo)
     {
         // TODO (desired): merge with FetchData.InvalidateOnDone
         // TODO (desired): when sending to network, register a callback for 
when local application of commitInvalidate message ahs been performed, so no 
need to special-case
-        node.forEachLocal(txnId, commitTo, lowEpoch, highEpoch, safeStore -> {
+        node.forEachLocal(txnId, reportTo.refine(txnId, null, commitTo), 
safeStore -> {
             // TODO (expected): consid
             StoreParticipants participants = 
StoreParticipants.notAccept(safeStore, commitTo, txnId);
             Commands.commitInvalidate(safeStore, safeStore.get(txnId, 
participants), commitTo);
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index dd26cb47..37831946 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -20,6 +20,7 @@ package accord.coordinate;
 
 import java.util.function.BiConsumer;
 
+import accord.local.CommandStores.StoreSelector;
 import accord.messages.InformDurable;
 import accord.primitives.*;
 import accord.utils.Invariants;
@@ -31,6 +32,7 @@ import accord.utils.UnhandledEnum;
 
 import static 
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
 import static accord.messages.Commit.Invalidate.commitInvalidate;
+import static accord.primitives.WithQuorum.HasQuorum;
 
 /**
  * A result of null indicates the transaction is globally persistent
@@ -40,21 +42,20 @@ public class MaybeRecover extends CheckShards<Route<?>>
 {
     final ProgressToken prevProgress;
     final BiConsumer<Outcome, Throwable> callback;
-    final long reportLowEpoch, reportHighEpoch;
+    final StoreSelector reportTo;
 
-    MaybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> 
someRoute, ProgressToken prevProgress, long reportLowEpoch, long 
reportHighEpoch, BiConsumer<Outcome, Throwable> callback)
+    MaybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> 
someRoute, ProgressToken prevProgress, StoreSelector reportTo, 
BiConsumer<Outcome, Throwable> callback)
     {
         // we only want to enquire with the home shard, but we prefer maximal 
route information for running Invalidation against, if necessary
         super(node, txnId, someRoute.withHomeKey(), IncludeInfo.Route, null, 
invalidIf);
         this.prevProgress = prevProgress;
         this.callback = callback;
-        this.reportLowEpoch = reportLowEpoch;
-        this.reportHighEpoch = reportHighEpoch;
+        this.reportTo = reportTo;
     }
 
-    public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Route<?> someRoute, ProgressToken prevProgress, long reportLowEpoch, 
long reportHighEpoch, BiConsumer<Outcome, Throwable> callback)
+    public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector 
reportTo, BiConsumer<Outcome, Throwable> callback)
     {
-        MaybeRecover maybeRecover = new MaybeRecover(node, txnId, invalidIf, 
someRoute, prevProgress, reportLowEpoch, reportHighEpoch, callback);
+        MaybeRecover maybeRecover = new MaybeRecover(node, txnId, invalidIf, 
someRoute, prevProgress, reportTo, callback);
         maybeRecover.start();
         return maybeRecover;
     }
@@ -85,7 +86,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
         else
         {
             Invariants.require(merged != null);
-            CheckStatusOk full = merged.finish(this.route, this.route, 
this.route, success.withQuorum, previouslyKnownToBeInvalidIf);
+            CheckStatusOk full = merged.finish(this.query, this.query, 
this.query, success.withQuorum, previouslyKnownToBeInvalidIf);
             Known known = full.maxKnown();
             Route<?> someRoute = full.route;
 
@@ -102,7 +103,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
                     // may be disseminated globally. However, if all shards 
are erased then the outcome must be
                     // decided locally by the application of GC points.
                     // TODO (expected): replicas may be stale in this case, 
and should detect this and stop attempting to coordinate/invalidate.
-                    if (known.canProposeInvalidation() && 
!Route.isFullRoute(full.route))
+                    if (success.withQuorum == HasQuorum && 
known.canProposeInvalidation() && !Route.isFullRoute(full.route))
                     {
                         // for correctness reasons, we have not necessarily 
preempted the initial pre-accept round and
                         // may have raced with it, so we must attempt to 
recover anything we see pre-accepted.
@@ -116,12 +117,12 @@ public class MaybeRecover extends CheckShards<Route<?>>
                     if (hasMadeProgress(full) || !Route.isFullRoute(someRoute))
                     {
                         if (full.durability.isDurable())
-                            InformDurable.informDefault(node, topologies, 
txnId, route, full.executeAtIfKnown(), full.durability);
+                            InformDurable.informDefault(node, topologies, 
txnId, query, full.executeAtIfKnown(), full.durability);
                         callback.accept(full.toProgressToken(), null);
                     }
                     else
                     {
-                        node.recover(txnId, full.invalidIf, 
Route.castToFullRoute(someRoute), reportLowEpoch, 
reportHighEpoch).invoke(callback);
+                        node.recover(txnId, full.invalidIf, 
Route.castToFullRoute(someRoute), reportTo).invoke(callback);
                     }
                     break;
 
@@ -132,7 +133,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
                     break;
 
                 case Abort:
-                    commitInvalidate(node, txnId, Route.merge(full.route, 
(Route)route), txnId.epoch());
+                    commitInvalidate(node, txnId, Route.merge(full.route, 
(Route) query), txnId.epoch());
                     locallyInvalidateAndCallback(node, txnId, txnId.epoch(), 
txnId.epoch(), someRoute, full.toProgressToken(), callback);
                     break;
             }
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java 
b/accord-core/src/main/java/accord/coordinate/Recover.java
index ee7a58c8..62c6c2d2 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -32,6 +32,7 @@ import accord.api.Result;
 import accord.api.RoutingKey;
 import accord.coordinate.ExecuteFlag.ExecuteFlags;
 import accord.coordinate.tracking.RecoveryTracker;
+import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.Accept;
@@ -109,7 +110,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
     private final Txn txn;
     private final FullRoute<?> route;
     private final @Nullable Timestamp committedExecuteAt;
-    private final long reportLowEpoch, reportHighEpoch;
+    private final LatentStoreSelector reportTo;
     private final BiConsumer<Outcome, Throwable> callback;
     private boolean isDone;
 
@@ -118,12 +119,9 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
     private boolean isBallotPromised;
 
     private Recover(Node node, Topologies topologies, Ballot ballot, TxnId 
txnId, Txn txn, FullRoute<?> route,
-                    @Nullable Timestamp committedExecuteAt, long 
reportLowEpoch, long reportHighEpoch,
+                    @Nullable Timestamp committedExecuteAt, 
LatentStoreSelector reportTo,
                     BiConsumer<Outcome, Throwable> callback)
     {
-        this.committedExecuteAt = committedExecuteAt;
-        this.reportLowEpoch = reportLowEpoch;
-        this.reportHighEpoch = reportHighEpoch;
         Invariants.require(txnId.isVisible());
         this.adapter = node.coordinationAdapter(txnId, Recovery);
         this.node = node;
@@ -131,6 +129,8 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         this.txnId = txnId;
         this.txn = txn;
         this.route = route;
+        this.committedExecuteAt = committedExecuteAt;
+        this.reportTo = reportTo;
         this.callback = callback;
         this.tracker = new RecoveryTracker(topologies);
         this.recoverOks = new SortedListMap<>(topologies.nodes(), 
RecoverOk[]::new);
@@ -166,39 +166,39 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
     public static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
     {
-        return recover(node, txnId, txn, route, txnId.epoch(), txnId.epoch(), 
callback);
+        return recover(node, txnId, txn, route, 
LatentStoreSelector.standard(), callback);
     }
 
-    public static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, long reportLowEpoch, long reportHighEpoch, 
BiConsumer<Outcome, Throwable> callback)
+    public static Recover recover(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome, 
Throwable> callback)
     {
         Ballot ballot = node.uniqueTimestamp(Ballot::fromValues);
-        return recover(node, ballot, txnId, txn, route, reportLowEpoch, 
reportHighEpoch, callback);
+        return recover(node, ballot, txnId, txn, route, reportTo, callback);
     }
 
     private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
     {
-        return recover(node, ballot, txnId, txn, route, null, txnId.epoch(), 
txnId.epoch(), callback);
+        return recover(node, ballot, txnId, txn, route, null, null, callback);
     }
 
-    private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, long reportLowEpoch, long reportHighEpoch, 
BiConsumer<Outcome, Throwable> callback)
+    private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome, 
Throwable> callback)
     {
-        return recover(node, ballot, txnId, txn, route, null, reportLowEpoch, 
reportHighEpoch, callback);
+        return recover(node, ballot, txnId, txn, route, null, reportTo, 
callback);
     }
 
     public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, @Nullable Timestamp executeAt, BiConsumer<Outcome, 
Throwable> callback)
     {
-        return recover(node, ballot, txnId, txn, route, executeAt, 
txnId.epoch(), (executeAt == null ? txnId : executeAt).epoch(), callback);
+        return recover(node, ballot, txnId, txn, route, executeAt, null, 
callback);
     }
 
-    public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, @Nullable Timestamp executeAt, long reportLowEpoch, 
long reportHighEpoch, BiConsumer<Outcome, Throwable> callback)
+    public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn 
txn, FullRoute<?> route, @Nullable Timestamp executeAt, LatentStoreSelector 
reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         Topologies topologies = node.topology().select(route, txnId, executeAt 
== null ? txnId : executeAt, SHARE, QuorumEpochIntersections.recover);
-        return recover(node, topologies, ballot, txnId, txn, route, executeAt, 
reportLowEpoch, reportHighEpoch, callback);
+        return recover(node, topologies, ballot, txnId, txn, route, executeAt, 
reportTo, callback);
     }
 
-    private static Recover recover(Node node, Topologies topologies, Ballot 
ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, long 
reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> callback)
+    private static Recover recover(Node node, Topologies topologies, Ballot 
ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt, 
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
     {
-        Recover recover = new Recover(node, topologies, ballot, txnId, txn, 
route, executeAt, reportLowEpoch, reportHighEpoch, callback);
+        Recover recover = new Recover(node, topologies, ballot, txnId, txn, 
route, executeAt, reportTo, callback);
         recover.start(topologies.nodes());
         return recover;
     }
@@ -523,12 +523,11 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
 
     private void commitInvalidate(Timestamp invalidateUntil)
     {
-        long highEpoch = Math.max(invalidateUntil.epoch(), reportHighEpoch);
-        node.withEpochAtLeast(highEpoch, node.agent(), () -> {
+        node.withEpochAtLeast(invalidateUntil.epoch(), node.agent(), () -> {
             Commit.Invalidate.commitInvalidate(node, txnId, route, 
invalidateUntil);
         });
         isDone = true;
-        locallyInvalidateAndCallback(node, txnId, reportLowEpoch, highEpoch, 
route, ProgressToken.INVALIDATED, callback);
+        locallyInvalidateAndCallback(node, txnId, reportTo.refine(txnId, null, 
route), route, ProgressToken.INVALIDATED, callback);
     }
 
     private void propose(Accept.Kind kind, Timestamp executeAt, 
List<RecoverOk> recoverOkList)
@@ -549,7 +548,7 @@ public class Recover implements Callback<RecoverReply>, 
BiConsumer<Result, Throw
         Topologies topologies = tracker.topologies();
         if (executeAt != null && executeAt.epoch() != (this.committedExecuteAt 
== null ? txnId : this.committedExecuteAt).epoch())
             topologies = node.topology().select(route, txnId, executeAt, 
SHARE, QuorumEpochIntersections.recover);
-        Recover.recover(node, topologies, 
node.uniqueTimestamp(Ballot::fromValues), txnId, txn, route, executeAt, 
reportLowEpoch, reportHighEpoch, callback);
+        Recover.recover(node, topologies, 
node.uniqueTimestamp(Ballot::fromValues), txnId, txn, route, executeAt, 
reportTo, callback);
     }
 
     AsyncResult<InferredFastPath> awaitEarlier(Node node, Deps waitOn, 
BlockedUntil blockedUntil)
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java 
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 9935dacf..e2970b8a 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -21,6 +21,7 @@ package accord.coordinate;
 import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 
+import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.messages.CheckStatus;
@@ -62,14 +63,12 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
 {
     final BiConsumer<Outcome, Throwable> callback;
     final Status witnessedByInvalidation;
-    // TODO (required): we don't use this on all paths, and should at least 
make sure we don't trigger any Invariant failures e.g. "Fetch was successful 
for all keys, but the WaitingState has not been cleared"
-    final long reportLowEpoch, reportHighEpoch;
+    final LatentStoreSelector reportTo;
 
-    private RecoverWithRoute(Node node, Topologies topologies, TxnId txnId, 
Infer.InvalidIf invalidIf, FullRoute<?> route, Status witnessedByInvalidation, 
long reportLowEpoch, long reportHighEpoch, BiConsumer<Outcome, Throwable> 
callback)
+    private RecoverWithRoute(Node node, Topologies topologies, TxnId txnId, 
Infer.InvalidIf invalidIf, FullRoute<?> route, Status witnessedByInvalidation, 
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         super(node, txnId, route, IncludeInfo.All, 
node.uniqueTimestamp(Ballot::fromValues), invalidIf);
-        this.reportLowEpoch = reportLowEpoch;
-        this.reportHighEpoch = reportHighEpoch;
+        this.reportTo = reportTo;
         // if witnessedByInvalidation == AcceptedInvalidate then we cannot 
assume its definition was known, and our comparison with the status is invalid
         Invariants.require(witnessedByInvalidation != 
Status.AcceptedInvalidate);
         // if witnessedByInvalidation == Invalidated we should anyway not be 
recovering
@@ -79,14 +78,14 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
         assert topologies.oldestEpoch() == topologies.currentEpoch() && 
topologies.currentEpoch() == txnId.epoch();
     }
 
-    public static RecoverWithRoute recover(Node node, TxnId txnId, 
Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status 
witnessedByInvalidation, long reportLowEpoch, long reportHighEpoch, 
BiConsumer<Outcome, Throwable> callback)
+    public static RecoverWithRoute recover(Node node, TxnId txnId, 
Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status 
witnessedByInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, 
Throwable> callback)
     {
-        return recover(node, node.topology().forEpoch(route, txnId.epoch(), 
SHARE), txnId, invalidIf, route, witnessedByInvalidation, reportLowEpoch, 
reportHighEpoch, callback);
+        return recover(node, node.topology().forEpoch(route, txnId.epoch(), 
SHARE), txnId, invalidIf, route, witnessedByInvalidation, reportTo, callback);
     }
 
-    private static RecoverWithRoute recover(Node node, Topologies topologies, 
TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status 
witnessedByInvalidation, long reportLowEpoch, long reportHighEpoch, 
BiConsumer<Outcome, Throwable> callback)
+    private static RecoverWithRoute recover(Node node, Topologies topologies, 
TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable Status 
witnessedByInvalidation, LatentStoreSelector reportTo, BiConsumer<Outcome, 
Throwable> callback)
     {
-        RecoverWithRoute recover = new RecoverWithRoute(node, topologies, 
txnId, invalidIf, route, witnessedByInvalidation, reportLowEpoch, 
reportHighEpoch, callback);
+        RecoverWithRoute recover = new RecoverWithRoute(node, topologies, 
txnId, invalidIf, route, witnessedByInvalidation, reportTo, callback);
         recover.start();
         return recover;
     }
@@ -94,21 +93,21 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
     @Override
     public void contact(Id to)
     {
-        node.send(to, new CheckStatus(to, topologies(), txnId, route, 
sourceEpoch, IncludeInfo.All, bumpBallot), this);
+        node.send(to, new CheckStatus(to, topologies(), txnId, query, 
sourceEpoch, IncludeInfo.All, bumpBallot), this);
     }
 
     @Override
     protected boolean isSufficient(Id from, CheckStatusOk ok)
     {
         Ranges rangesForNode = 
topologies().getEpoch(txnId.epoch()).rangesForNode(from);
-        Route<?> route = this.route.slice(rangesForNode);
+        Route<?> route = this.query.slice(rangesForNode);
         return isSufficient(route, ok);
     }
 
     @Override
     protected boolean isSufficient(CheckStatusOk ok)
     {
-        return isSufficient(route, merged);
+        return isSufficient(query, merged);
     }
 
     protected boolean isSufficient(Route<?> route, CheckStatusOk ok)
@@ -134,8 +133,8 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
             return;
         }
 
-        CheckStatusOkFull full = ((CheckStatusOkFull) 
this.merged).finish(route, route, route, success.withQuorum, 
previouslyKnownToBeInvalidIf);
-        Known known = full.knownFor(txnId, route, route);
+        CheckStatusOkFull full = ((CheckStatusOkFull) 
this.merged).finish(query, query, query, success.withQuorum, 
previouslyKnownToBeInvalidIf);
+        Known known = full.knownFor(txnId, query, query);
 
         // TODO (required): audit this logic, and centralise with e.g. 
FetchData inferences
         // TODO (expected): skip straight to ExecuteTxn if we have a Stable 
reply from each shard
@@ -145,14 +144,14 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
             case Unknown:
                 if (known.definition().isKnown())
                 {
-                    Txn txn = full.partialTxn.reconstitute(route);
-                    Recover.recover(node, txnId, txn, route, reportLowEpoch, 
reportHighEpoch, callback);
+                    Txn txn = full.partialTxn.reconstitute(query);
+                    Recover.recover(node, txnId, txn, query, reportTo, 
callback);
                 }
                 else if (!known.definition().isOrWasKnown())
                 {
                     if (witnessedByInvalidation != null && 
witnessedByInvalidation.compareTo(Status.PreAccepted) > 0)
                         throw illegalState("We previously invalidated, finding 
a status that should be recoverable");
-                    Invalidate.invalidate(node, txnId, route, 
witnessedByInvalidation != null, reportLowEpoch, reportHighEpoch, callback);
+                    Invalidate.invalidate(node, txnId, query, 
witnessedByInvalidation != null, reportTo, callback);
                 }
                 else
                 {
@@ -173,11 +172,11 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
 
                     // TODO (expected): if we determine new durability, 
propagate it
                     CheckStatusOkFull propagate;
-                    if (!full.map.hasFullyTruncated(route))
+                    if (!full.map.hasFullyTruncated(query))
                     {
                         // we might have only part of the full transaction, 
and a shard may have truncated;
                         // in this case we want to skip straight to apply, but 
only for the shards that haven't truncated
-                        Route<?> trySendTo = 
route.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated()));
+                        Route<?> trySendTo = 
query.without(full.map.matchingRanges(minMax -> minMax.min.isTruncated()));
                         if (!trySendTo.isEmpty())
                         {
                             if (known.isInvalidated())
@@ -193,20 +192,20 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                                     if (!known.is(DepsKnown))
                                     {
                                         Invariants.require(txnId.isSystemTxn() 
|| full.partialTxn.covers(trySendTo));
-                                        Participants<?> haveStable = 
full.map.knownFor(Known.DepsOnly, route);
-                                        Route<?> haveUnstable = 
route.without(haveStable);
+                                        Participants<?> haveStable = 
full.map.knownFor(Known.DepsOnly, query);
+                                        Route<?> haveUnstable = 
query.without(haveStable);
                                         Deps stable = 
full.stableDeps.reconstitutePartial(haveStable).asFullUnsafe();
 
-                                        
LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, 
full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, route, 
callback, deps -> {
+                                        
LatestDeps.withStable(node.coordinationAdapter(txnId, Recovery), node, txnId, 
full.executeAt, full.partialTxn, stable, haveUnstable, trySendTo, SLICE, query, 
callback, deps -> {
                                             Deps stableDeps = 
deps.intersecting(trySendTo);
-                                            node.coordinationAdapter(txnId, 
Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, bumpBallot, 
txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, 
informDurableOnDone, null);
+                                            node.coordinationAdapter(txnId, 
Recovery).persist(node, null, trySendTo, trySendTo, SLICE, query, bumpBallot, 
txnId, full.partialTxn, full.executeAt, stableDeps, full.writes, full.result, 
informDurableOnDone, null);
                                         });
                                     }
                                     else
                                     {
                                         
Invariants.require(full.stableDeps.covers(trySendTo));
                                         Invariants.require(txnId.isSystemTxn() 
|| full.partialTxn.covers(trySendTo));
-                                        node.coordinationAdapter(txnId, 
Recovery).persist(node, null, trySendTo, trySendTo, SLICE, route, bumpBallot, 
txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, 
full.result, informDurableOnDone, null);
+                                        node.coordinationAdapter(txnId, 
Recovery).persist(node, null, trySendTo, trySendTo, SLICE, query, bumpBallot, 
txnId, full.partialTxn, full.executeAt, full.stableDeps, full.writes, 
full.result, informDurableOnDone, null);
                                     }
                                 }
                             }
@@ -222,24 +221,24 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                         propagate = full;
                     }
 
-                    Propagate.propagate(node, txnId, 
previouslyKnownToBeInvalidIf, sourceEpoch, reportLowEpoch, reportHighEpoch, 
success.withQuorum, route, route, null, propagate, (s, f) -> callback.accept(f 
== null ? propagate.toProgressToken() : null, f));
+                    Propagate.propagate(node, txnId, 
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, 
reportTo, null, propagate, (s, f) -> callback.accept(f == null ? 
propagate.toProgressToken() : null, f));
                     break;
                 }
 
-                Txn txn = full.partialTxn.reconstitute(route);
+                Txn txn = full.partialTxn.reconstitute(query);
                 if (known.is(ApplyAtKnown) && known.outcome() == Apply)
                 {
                     Deps deps;
                     Route<?> missingDeps;
                     if (known.is(DepsKnown))
                     {
-                        deps = full.stableDeps.reconstitute(route);
-                        missingDeps = route.slice(0, 0);
+                        deps = full.stableDeps.reconstitute(query);
+                        missingDeps = query.slice(0, 0);
                     }
                     else
                     {
-                        Participants<?> hasDeps = 
full.map.knownFor(Known.DepsOnly, route);
-                        missingDeps = route.without(hasDeps);
+                        Participants<?> hasDeps = 
full.map.knownFor(Known.DepsOnly, query);
+                        missingDeps = query.without(hasDeps);
                         if (full.stableDeps == null)
                         {
                             Invariants.require(hasDeps.isEmpty());
@@ -252,9 +251,9 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                             deps = new 
Deps(full.stableDeps.reconstitutePartial(hasDeps));
                         }
                     }
-                    LatestDeps.withStable(node.coordinationAdapter(txnId, 
Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, 
route, SHARE, route, callback, mergedDeps -> {
+                    LatestDeps.withStable(node.coordinationAdapter(txnId, 
Recovery), node, txnId, full.executeAt, full.partialTxn, deps, missingDeps, 
query, SHARE, query, callback, mergedDeps -> {
                         node.withEpochExact(full.executeAt.epoch(), 
node.agent(), t -> WrappableException.wrap(t), () -> {
-                            node.coordinationAdapter(txnId, 
Recovery).persist(node, topologies, route, bumpBallot, txnId, txn, 
full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> {
+                            node.coordinationAdapter(txnId, 
Recovery).persist(node, topologies, query, bumpBallot, txnId, txn, 
full.executeAt, mergedDeps, full.writes, full.result, (s, f) -> {
                                 callback.accept(f == null ? APPLIED : null, f);
                             });
                         });
@@ -262,7 +261,7 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                 }
                 else
                 {
-                    Recover.recover(node, txnId, txn, route, callback);
+                    Recover.recover(node, txnId, txn, query, callback);
                 }
                 break;
 
@@ -270,12 +269,12 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                 if (witnessedByInvalidation != null && 
witnessedByInvalidation.hasBeen(Status.PreCommitted))
                     throw illegalState("We previously invalidated, finding a 
status that should be recoverable");
 
-                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, 
null, full, (s, f) -> callback.accept(f == null ? INVALIDATED : null, f));
+                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? INVALIDATED : null, f));
                 break;
 
             case Erased:
                 // we should only be able to hit the Erased case if every 
participating shard has advanced past this TxnId, so we don't need to recover it
-                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, reportLowEpoch, reportHighEpoch, success.withQuorum, route, route, 
null, full, (s, f) -> callback.accept(f == null ? 
TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
+                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
                 break;
         }
     }
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index f426a6d5..56e763d7 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -29,6 +29,7 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
 import accord.api.ConfigurationService;
 import accord.local.Node;
 import accord.primitives.Ranges;
@@ -45,6 +46,7 @@ public abstract class AbstractConfigurationService<EpochState 
extends AbstractCo
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractConfigurationService.class);
 
     protected final Node.Id localId;
+    protected final Agent agent;
 
     protected final EpochHistory epochs = createEpochHistory();
 
@@ -294,9 +296,10 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         }
     }
 
-    public AbstractConfigurationService(Node.Id localId)
+    public AbstractConfigurationService(Node.Id localId, Agent agent)
     {
         this.localId = localId;
+        this.agent = agent;
     }
 
     protected abstract EpochHistory createEpochHistory();
@@ -363,8 +366,9 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
         {
             logger.debug("Epoch {} received; waiting to receive {} before 
reporting", topology.epoch(), lastReceived + 1);
-            epochs.receiveFuture(lastReceived + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync), executor());
-
+            epochs.receiveFuture(lastReceived + 1)
+                  .invokeIfSuccess(() -> reportTopology(topology, isLoad, 
startSync), executor())
+                  .begin(agent);
             fetchTopologyForEpoch(lastReceived + 1);
             return;
         }
@@ -373,14 +377,18 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
         if (lastAcked == 0 && lastReceived > 0)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), epochs.minEpoch(), executor());
-            epochs.acknowledgeFuture(epochs.minEpoch()).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync));
+            epochs.acknowledgeFuture(epochs.minEpoch())
+                  .invokeIfSuccess(() -> reportTopology(topology, isLoad, 
startSync))
+                  .begin(agent);
             return;
         }
 
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), lastAcked + 1);
-            epochs.acknowledgeFuture(lastAcked + 1).invokeIfSuccess(() -> 
reportTopology(topology, isLoad, startSync), executor());
+            epochs.acknowledgeFuture(lastAcked + 1)
+                  .invokeIfSuccess(() -> reportTopology(topology, isLoad, 
startSync), executor())
+                  .begin(agent);
             return;
         }
 
@@ -453,9 +461,9 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             }
         }
 
-        public Minimal(Node.Id node)
+        public Minimal(Node.Id node, Agent agent)
         {
-            super(node);
+            super(node, agent);
         }
 
         @Override
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 8c1885d8..129d03e1 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -24,6 +24,8 @@ import javax.annotation.Nullable;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
 import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.CommandStores.IncludingSpecificStoreSelector;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.primitives.Status;
@@ -153,9 +155,8 @@ abstract class HomeState extends WaitingState
 
         CallbackInvoker<ProgressToken, Outcome> invoker = 
invokeHomeCallback(instance, txnId, maxProgressToken, 
HomeState::recoverCallback);
 
-        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnId.epoch(), 
command.participants().hasTouched());
-        long highEpoch = 
safeStore.ranges().earliestLaterEpochThatFullyCovers(command.executeAtIfKnownElseTxnId().epoch(),
 command.participants().hasTouched());
-        instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, 
invalidIf(), command.route(), maxProgressToken, lowEpoch, highEpoch, invoker), 
invoker);
+        CommandStores.StoreSelector reportTo = new 
IncludingSpecificStoreSelector(safeStore.commandStore().id());
+        instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId, 
invalidIf(), command.route(), maxProgressToken, reportTo, invoker), invoker);
         set(safeStore, instance, ReadyToExecute, Querying);
     }
 
@@ -178,6 +179,9 @@ abstract class HomeState extends WaitingState
             if (fail != null)
             {
                 safeStore.agent().onCaughtException(fail, "Failed recovering " 
+ state);
+                // re-save prior progress token
+                if (prevProgressToken != null && 
prevProgressToken.compareTo(command) > 0)
+                    instance.saveProgressToken(command.txnId(), 
prevProgressToken);
                 state.incrementHomeRetryCounter();
                 state.set(safeStore, instance, status, Queued);
             }
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index ef973281..e5045636 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -25,7 +25,9 @@ import javax.annotation.Nullable;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.coordinate.AsynchronousAwait;
 import accord.coordinate.FetchData;
+import accord.coordinate.FetchSomeRoute;
 import accord.local.Command;
+import accord.local.CommandStores.IncludingSpecificStoreSelector;
 import accord.local.CommandStores.RangesForEpoch;
 import accord.local.Node;
 import accord.local.SafeCommand;
@@ -141,7 +143,7 @@ abstract class WaitingState extends BaseTxnState
         return waitingProgress(encodedState);
     }
 
-    final @Nonnull long waitingKeyTrackerBits()
+    final long waitingKeyTrackerBits()
     {
         return (encodedState >>> AWAIT_SHIFT) & (-1L >>> (64 - AWAIT_BITS));
     }
@@ -240,7 +242,7 @@ abstract class WaitingState extends BaseTxnState
         if (offset >= 3)
         {
             offset = 3;
-            lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, lowEpoch, 
command.maxContactable());
+            lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(lowEpoch, 
command.maxContactable());
         }
         encodedState = encodedState & ~(0x3L << AWAIT_EPOCH_SHIFT);
         encodedState |= ((long)offset) << AWAIT_EPOCH_SHIFT;
@@ -263,7 +265,7 @@ abstract class WaitingState extends BaseTxnState
         long epoch = ranges.epochAtIndex(Math.max(0, i)) - 1;
         if (offset < 3)
             return epoch;
-        return safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, 
epoch, route);
+        return safeStore.ranges().latestEarlierEpochThatFullyCovers(epoch, 
route);
     }
 
     boolean hasNewLowEpoch(SafeCommandStore safeStore, TxnId txnId, long 
prevLowEpoch, long newLowEpoch)
@@ -298,10 +300,10 @@ abstract class WaitingState extends BaseTxnState
 
     long readHighEpoch(SafeCommandStore safeStore, TxnId txnId, Route<?> route)
     {
+        RangesForEpoch ranges = safeStore.ranges();
         int offset = (int) ((encodedState >>> (AWAIT_EPOCH_SHIFT + 2)) & 0x3);
         if (offset == 0)
-            return txnId.epoch();
-        RangesForEpoch ranges = safeStore.ranges();
+            return Math.max(txnId.epoch(), ranges.epochAtIndex(0));
         long epoch = ranges.epochAtIndex(Math.max(0, 
ranges.floorIndex(txnId.epoch())) + offset);
         if (offset < 3)
             return epoch;
@@ -389,17 +391,15 @@ abstract class WaitingState extends BaseTxnState
         TxnId txnId = safeCommand.txnId();
         // first make sure we have enough information to obtain the command 
locally
         Timestamp executeAt = command.executeAtIfKnown();
-        Participants<?> fetchKeys = 
Invariants.nonNull(command.maxContactable());
+        Participants<?> maxContactable = 
Invariants.nonNull(command.maxContactable());
 
-        if (!Route.isRoute(fetchKeys))
+        if (!Route.isRoute(maxContactable))
         {
-            long lowEpoch = updateLowEpoch(safeStore, txnId, command);
-            long highEpoch = updateHighEpoch(safeStore, txnId, blockedUntil, 
command, executeAt);
-            fetchRoute(owner, blockedUntil, txnId, executeAt, lowEpoch, 
highEpoch, fetchKeys);
+            fetchRoute(owner, blockedUntil, txnId, maxContactable);
             return;
         }
 
-        Route<?> route = Route.castToRoute(fetchKeys);
+        Route<?> route = Route.castToRoute(maxContactable);
         if (homeSatisfies().compareTo(blockedUntil) < 0)
         {
             // first wait until the homeKey has progressed to a point where it 
can answer our query; we don't expect our shards to know until then anyway
@@ -418,7 +418,7 @@ abstract class WaitingState extends BaseTxnState
         {
             // we know it has been decided one way or the other by the home 
shard at least, so we attempt a fetch
             // including the home shard to get us to at least PreCommitted 
where we can safely wait on individual shards
-            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, 
slicedRoute, slicedRoute.withHomeKey(), route);
+            fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
slicedRoute.withHomeKey(), route);
             return;
         }
 
@@ -430,7 +430,7 @@ abstract class WaitingState extends BaseTxnState
         if (awaitRoute.isHomeKeyOnlyRoute())
         {
             // at this point we can switch to polling as we know someone has 
the relevant state
-            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, 
slicedRoute, fetchRoute, route);
+            fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
fetchRoute, route);
             return;
         }
 
@@ -457,7 +457,7 @@ abstract class WaitingState extends BaseTxnState
         {
             // all of the shards we are awaiting have been processed and found 
at least one replica that has the state needed to answer our query
             // at this point we can switch to polling as we know someone has 
the relevant state
-            fetch(owner, blockedUntil, txnId, executeAt, lowEpoch, highEpoch, 
slicedRoute, fetchRoute, route);
+            fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
fetchRoute, route);
             return;
         }
 
@@ -485,6 +485,7 @@ abstract class WaitingState extends BaseTxnState
             if (route == null)
             {
                 Invariants.require(kind == FetchRoute);
+                Invariants.require(ready == null);
                 state.retry(safeStore, safeCommand, owner, blockedUntil);
                 return;
             }
@@ -599,9 +600,12 @@ abstract class WaitingState extends BaseTxnState
         }
     }
 
-    static void fetchRouteCallback(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, 
FetchData.FetchResult fetchResult, Throwable fail)
+    static void fetchRouteCallback(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, 
Route<?> found, Throwable fail)
     {
-        fetchCallback(FetchRoute, safeStore, safeCommand, owner, txnId, 
blockedUntil, fetchResult, fail);
+        if (found == null)
+            found = safeCommand.current().route();
+        Participants<?> ready = found != null ? found.slice(0, 0) : null;
+        awaitOrFetchCallback(FetchRoute, safeStore, safeCommand, owner, txnId, 
blockedUntil, ready, null, null, fail);
     }
 
     static void fetchCallback(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog owner, TxnId txnId, BlockedUntil blockedUntil, 
FetchData.FetchResult fetchResult, Throwable fail)
@@ -703,20 +707,20 @@ abstract class WaitingState extends BaseTxnState
         }
     }
 
-    static void fetchRoute(DefaultProgressLog owner, BlockedUntil 
blockedUntil, TxnId txnId, Timestamp executeAt, long lowEpoch, long highEpoch, 
Participants<?> fetchKeys)
+    static void fetchRoute(DefaultProgressLog owner, BlockedUntil 
blockedUntil, TxnId txnId, Participants<?> contactable)
     {
         // TODO (desired): fetch only the route
         // we MUSt allocate before calling withEpoch to register cancellation, 
as async
-        BiConsumer<FetchData.FetchResult, Throwable> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, 
WaitingState::fetchRouteCallback);
-        FetchData.fetch(blockedUntil.unblockedFrom.known, owner.node(), txnId, 
executeAt, fetchKeys, lowEpoch, highEpoch, invoker);
+        BiConsumer<Route<?>, Throwable> invoker = invokeWaitingCallback(owner, 
txnId, blockedUntil, WaitingState::fetchRouteCallback);
+        FetchSomeRoute.fetchSomeRoute(owner.node(), txnId, contactable, new 
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
     }
 
-    static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, 
TxnId txnId, Timestamp executeAt, long lowEpoch, long highEpoch, Route<?> 
slicedRoute, Route<?> fetchRoute, Route<?> maxRoute)
+    static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, 
TxnId txnId, Timestamp executeAt, Route<?> slicedRoute, Route<?> fetchRoute, 
Route<?> maxRoute)
     {
         Invariants.require(!slicedRoute.isEmpty());
         // we MUSt allocate before calling withEpoch to register cancellation, 
as async
         BiConsumer<FetchData.FetchResult, Throwable> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback);
-        FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, 
owner.node(), txnId, executeAt, fetchRoute, maxRoute, slicedRoute, lowEpoch, 
highEpoch, invoker);
+        FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, 
owner.node(), txnId, executeAt, fetchRoute, maxRoute, slicedRoute, new 
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
     }
 
     void awaitHomeKey(DefaultProgressLog owner, BlockedUntil blockedUntil, 
TxnId txnId, Timestamp executeAt, Route<?> route)
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 0bbc4bbf..e7e457d6 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -20,6 +20,8 @@ package accord.local;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -27,7 +29,6 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -52,7 +53,6 @@ import accord.primitives.EpochSupplier;
 import accord.primitives.Participants;
 import accord.primitives.Range;
 import accord.primitives.Ranges;
-import accord.primitives.Route;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -73,6 +73,7 @@ import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResults;
 import accord.utils.async.Cancellable;
 import org.agrona.collections.Hashing;
+import org.agrona.collections.Int2IntHashMap;
 import org.agrona.collections.Int2ObjectHashMap;
 
 import static accord.api.ConfigurationService.EpochReady.done;
@@ -89,6 +90,158 @@ public abstract class CommandStores
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(CommandStores.class);
 
+    public interface LatentStoreSelector
+    {
+        StoreSelector refine(TxnId txnId, @Nullable Timestamp executeAt, 
Participants<?> participants);
+
+        class StandardLatentStoreSelector implements LatentStoreSelector
+        {
+            private static final StandardLatentStoreSelector INSTANCE = new 
StandardLatentStoreSelector();
+
+            @Override
+            public StoreSelector refine(TxnId txnId, @Nullable Timestamp 
executeAt, Participants<?> participants)
+            {
+                return snapshot -> StoreFinder.find(snapshot, participants)
+                                              .filter(snapshot, participants, 
txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch())
+                                              .iterator(snapshot);
+            }
+        }
+
+        static LatentStoreSelector standard()
+        {
+            return StandardLatentStoreSelector.INSTANCE;
+        }
+    }
+
+    public interface StoreSelector extends LatentStoreSelector
+    {
+        default StoreSelector refine(TxnId txnId, @Nullable Timestamp 
executeAt, Participants<?> participants) { return this; }
+        Iterator<CommandStore> select(Snapshot snapshot);
+    }
+
+    public static class IncludingSpecificStoreSelector implements StoreSelector
+    {
+        final int storeId;
+
+        public IncludingSpecificStoreSelector(int storeId)
+        {
+            this.storeId = storeId;
+        }
+
+        @Override
+        public StoreSelector refine(TxnId txnId, @Nullable Timestamp 
executeAt, Participants<?> participants)
+        {
+            return snapshot -> {
+                StoreFinder finder = StoreFinder.find(snapshot, participants)
+                                                .filter(snapshot, 
participants, txnId.epoch(), (executeAt != null ? executeAt : txnId).epoch());
+                finder.set(snapshot.byId.get(storeId));
+                return finder.iterator(snapshot);
+            };
+        }
+
+        @Override
+        public Iterator<CommandStore> select(Snapshot snapshot)
+        {
+            return 
Collections.singletonList(snapshot.byId(storeId)).iterator();
+        }
+    }
+
+
+    public static class StoreFinder extends SimpleBitSet implements 
IndexedQuadConsumer<Object, Object, Object, Object>, 
IndexedRangeQuadConsumer<Object, Object, Object, Object>
+    {
+        final int[] indexMap;
+
+        private StoreFinder(int size, int[] indexMap)
+        {
+            super(size);
+            this.indexMap = indexMap;
+        }
+
+        public StoreFinder(Snapshot snapshot)
+        {
+            this(snapshot.shards.length, snapshot.indexForRange);
+        }
+
+        public static StoreSelector selector(Unseekables<?> unseekables, long 
minEpoch, long maxEpoch)
+        {
+            return snapshot -> {
+                StoreFinder finder = StoreFinder.find(snapshot, unseekables);
+                finder.filter(snapshot, unseekables, minEpoch, maxEpoch);
+                return finder.iterator(snapshot);
+            };
+        }
+
+        public static StoreFinder find(Snapshot snapshot, Unseekables<?> 
unseekables)
+        {
+            StoreFinder finder = new StoreFinder(snapshot);
+            switch (unseekables.domain())
+            {
+                default: throw new UnhandledEnum(unseekables.domain());
+                case Range:
+                {
+                    int minIndex = 0;
+                    for (Range range : (AbstractRanges)unseekables)
+                        minIndex = snapshot.lookupByRange.forEachRange(range, 
finder, finder, null, null, null, null, minIndex);
+                    break;
+                }
+                case Key:
+                {
+                    int minIndex = 0;
+                    for (RoutingKey key : (AbstractUnseekableKeys)unseekables)
+                        minIndex = snapshot.lookupByRange.forEachKey(key, 
finder, finder, null, null, null, null, minIndex);
+                    break;
+                }
+            }
+            return finder;
+        }
+
+        public StoreFinder filter(Snapshot snapshot, Unseekables<?> 
unseekables, long minEpoch, long maxEpoch)
+        {
+            for (int i = firstSetBit(); i >= 0 ; i = nextSetBit(i + 1, -1))
+            {
+                ShardHolder shard = snapshot.shards[i];
+                Ranges shardRanges = shard.ranges().allBetween(minEpoch, 
maxEpoch);
+                if (shardRanges != shard.ranges.all() && 
!shardRanges.intersects(unseekables))
+                    unset(i);
+            }
+            return this;
+        }
+
+        public Iterator<CommandStore> iterator(Snapshot snapshot)
+        {
+            return new Iterator<>()
+            {
+                int i = firstSetBit();
+                @Override
+                public boolean hasNext()
+                {
+                    return i >= 0;
+                }
+
+                @Override
+                public CommandStore next()
+                {
+                    CommandStore next = snapshot.shards[i].store;
+                    i = nextSetBit(i + 1, -1);
+                    return next;
+                }
+            };
+        }
+
+        @Override
+        public void accept(Object p1, Object p2, Object p3, Object p4, int 
index)
+        {
+            set(indexMap[index]);
+        }
+
+        @Override
+        public void accept(Object p1, Object p2, Object p3, Object p4, int 
fromIndex, int toIndex)
+        {
+            for (int i = fromIndex ; i < toIndex ; ++i)
+                set(indexMap[i]);
+        }
+    }
+
     public interface Factory
     {
         CommandStores create(NodeCommandStoreService time,
@@ -130,7 +283,7 @@ public abstract class CommandStores
         }
     }
 
-    protected static class ShardHolder
+    public static class ShardHolder
     {
         public final CommandStore store;
         RangesForEpoch ranges;
@@ -360,22 +513,27 @@ public abstract class CommandStores
             return Math.max(sinceEpoch, epochs[0]);
         }
 
-        public long latestEarlierEpochThatFullyCovers(SafeCommandStore 
safeStore, long beforeEpoch, Unseekables<?> keysOrRanges)
+        public long latestEarlierEpochThatFullyCovers(long beforeEpoch, 
Unseekables<?> keysOrRanges)
         {
             int i = ceilIndex(beforeEpoch);
-            if (i == 0 || i <= epochs.length)
+            if (i == 0)
                 return beforeEpoch;
+
             long latest = beforeEpoch;
-            Ranges existing = i >= ranges.length ? Ranges.EMPTY : ranges[i];
-            long minEpoch = safeStore.node().topology().minEpoch();
-            while (--i >= 0 && minEpoch < epochs[i])
+            Ranges existing = Ranges.EMPTY;
+            long next = beforeEpoch;
+            if (i < epochs.length)
+            {
+                existing = ranges[i];
+                next = Math.min(next, epochs[i]);
+            }
+            while (--i >= 0)
             {
                 if (ranges[i].without(existing).intersects(keysOrRanges))
-                    latest = epochs[i + 1] - 1;
+                    latest = next - 1;
                 existing = existing.with(ranges[i]);
+                next = epochs[i];
             }
-            if (latest < beforeEpoch)
-                latest = Math.max(latest, 
safeStore.node().topology().minEpoch());
             return latest;
         }
 
@@ -398,22 +556,23 @@ public abstract class CommandStores
         current = toLoad;
     }
 
-    protected static class Snapshot extends Journal.TopologyUpdate
+    public static class Snapshot extends Journal.TopologyUpdate implements 
Iterable<ShardHolder>
     {
-        public final ShardHolder[] shards;
-        public final Int2ObjectHashMap<CommandStore> byId;
+        final ShardHolder[] shards;
+        final Int2IntHashMap byId;
         private final int[] indexForRange;
-        public final SearchableRangeList lookupByRange;
+        final SearchableRangeList lookupByRange;
 
         public Snapshot(ShardHolder[] shards, Topology local, Topology global)
         {
             super(asMap(shards), local, global);
             this.shards = shards;
-            this.byId = new Int2ObjectHashMap<>(shards.length, 
Hashing.DEFAULT_LOAD_FACTOR, true);
+            this.byId = new Int2IntHashMap(shards.length, 
Hashing.DEFAULT_LOAD_FACTOR, -1);
             int count = 0;
-            for (ShardHolder shard : shards)
+            for (int i = 0 ; i < shards.length ; ++i)
             {
-                byId.put(shard.store.id(), shard.store);
+                ShardHolder shard = shards[i];
+                byId.put(shard.store.id(), i);
                 count += shard.ranges.all().size();
             }
             class RangeAndIndex
@@ -461,6 +620,17 @@ public abstract class CommandStores
                 commandStores.put(shard.store.id, shard.ranges);
             return commandStores;
         }
+
+        public CommandStore byId(int id)
+        {
+            return shards[byId.get(id)].store;
+        }
+
+        @Override
+        public Iterator<ShardHolder> iterator()
+        {
+            return Arrays.asList(shards).iterator();
+        }
     }
 
     final StoreSupplier supplier;
@@ -666,7 +836,7 @@ public abstract class CommandStores
 
     private AsyncChain<Void> forEach(PreLoadContext context, Unseekables<?> 
keys, long minEpoch, long maxEpoch, Consumer<SafeCommandStore> forEach, boolean 
matchesMultiple)
     {
-        return this.mapReduce(context, keys, minEpoch, maxEpoch, new 
MapReduce<SafeCommandStore, Void>()
+        return this.mapReduce(context, keys, minEpoch, maxEpoch, new 
MapReduce<>()
         {
             @Override
             public Void apply(SafeCommandStore in)
@@ -692,6 +862,22 @@ public abstract class CommandStores
         });
     }
 
+    public AsyncChain<Void> forEach(PreLoadContext context, StoreSelector 
selector, Consumer<SafeCommandStore> forEach)
+    {
+        return this.mapReduce(context, selector, new MapReduce<>()
+        {
+            @Override
+            public Void apply(SafeCommandStore in)
+            {
+                forEach.accept(in);
+                return null;
+            }
+            @Override
+            public Void reduce(Void o1, Void o2) { return null; }
+            @Override public String toString() { return 
forEach.getClass().getName(); }
+        });
+    }
+
     /**
      * See {@link #mapReduceConsume(PreLoadContext, Unseekables, long, long, 
MapReduceConsume)}
      */
@@ -715,79 +901,33 @@ public abstract class CommandStores
         return reduced.begin(mapReduceConsume);
     }
 
+    protected <O> Cancellable mapReduceConsume(PreLoadContext context, 
StoreSelector selector, MapReduceConsume<? super SafeCommandStore, O> 
mapReduceConsume)
+    {
+        AsyncChain<O> reduced = mapReduce(context, selector, mapReduceConsume);
+        return reduced.begin(mapReduceConsume);
+    }
+
     public  <O> Cancellable mapReduceConsume(PreLoadContext context, IntStream 
commandStoreIds, MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
     {
         AsyncChain<O> reduced = mapReduce(context, commandStoreIds, 
mapReduceConsume);
         return reduced.begin(mapReduceConsume);
     }
 
-    private static class StoreSelector extends SimpleBitSet implements 
IndexedQuadConsumer<Object, Object, Object, Object>, 
IndexedRangeQuadConsumer<Object, Object, Object, Object>
+    // TODO (required): as we get more tables this will become expensive to 
allocate; we need to index first by prefix
+    public <O> AsyncChain<O> mapReduce(PreLoadContext context, Unseekables<?> 
unseekables, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, 
O> mapReduce)
     {
-        final int[] indexMap;
-
-        private StoreSelector(int size, int[] indexMap)
-        {
-            super(size);
-            this.indexMap = indexMap;
-        }
-
-        public StoreSelector(Snapshot snapshot)
-        {
-            this(snapshot.shards.length, snapshot.indexForRange);
-        }
-
-        static StoreSelector select(Snapshot snapshot, Unseekables<?> 
unseekables)
-        {
-            StoreSelector selector = new StoreSelector(snapshot);
-            switch (unseekables.domain())
-            {
-                default: throw new UnhandledEnum(unseekables.domain());
-                case Range:
-                {
-                    int minIndex = 0;
-                    for (Range range : (AbstractRanges)unseekables)
-                        minIndex = snapshot.lookupByRange.forEachRange(range, 
selector, selector, null, null, null, null, minIndex);
-                    break;
-                }
-                case Key:
-                {
-                    int minIndex = 0;
-                    for (RoutingKey key : (AbstractUnseekableKeys)unseekables)
-                        minIndex = snapshot.lookupByRange.forEachKey(key, 
selector, selector, null, null, null, null, minIndex);
-                    break;
-                }
-            }
-            return selector;
-        }
-
-        @Override
-        public void accept(Object p1, Object p2, Object p3, Object p4, int 
index)
-        {
-            set(indexMap[index]);
-        }
-
-        @Override
-        public void accept(Object p1, Object p2, Object p3, Object p4, int 
fromIndex, int toIndex)
-        {
-            for (int i = fromIndex ; i < toIndex ; ++i)
-                set(indexMap[i]);
-        }
+        return mapReduce(context, StoreFinder.selector(unseekables, minEpoch, 
maxEpoch), mapReduce);
     }
 
-    public <O> AsyncChain<O> mapReduce(PreLoadContext context, Unseekables<?> 
unseekables, long minEpoch, long maxEpoch, MapReduce<? super SafeCommandStore, 
O> mapReduce)
+    public <O> AsyncChain<O> mapReduce(PreLoadContext context, StoreSelector 
selector, MapReduce<? super SafeCommandStore, O> mapReduce)
     {
         AsyncChain<O> chain = null;
         BiFunction<O, O, O> reducer = mapReduce::reduce;
         Snapshot snapshot = current;
-        StoreSelector selector = StoreSelector.select(snapshot, unseekables);
-        for (int i = selector.nextSetBit(0, -1); i >= 0 ; i = 
selector.nextSetBit(i + 1, -1))
+        Iterator<CommandStore> stores = selector.select(snapshot);
+        while (stores.hasNext())
         {
-            ShardHolder shard = snapshot.shards[i];
-            Ranges shardRanges = shard.ranges().allBetween(minEpoch, maxEpoch);
-            if (shardRanges != shard.ranges.all() && 
!shardRanges.intersects(unseekables))
-                continue;
-
-            AsyncChain<O> next = shard.store.build(context, mapReduce);
+            AsyncChain<O> next = stores.next().build(context, mapReduce);
             chain = chain != null ? AsyncChains.reduce(chain, next, reducer) : 
next;
         }
         return chain == null ? AsyncChains.success(null) : chain;
@@ -811,17 +951,7 @@ public abstract class CommandStores
 
     protected <O> AsyncChain<O> mapReduce(PreLoadContext context, IntStream 
commandStoreIds, MapReduce<? super SafeCommandStore, O> mapReduce)
     {
-        // TODO (low priority, efficiency): avoid using an array, or use a 
scratch buffer
-        int[] ids = commandStoreIds.toArray();
-        AsyncChain<O> chain = null;
-        BiFunction<O, O, O> reducer = mapReduce::reduce;
-        for (int id : ids)
-        {
-            CommandStore commandStore = forId(id);
-            AsyncChain<O> next = commandStore.build(context, mapReduce);
-            chain = chain != null ? AsyncChains.reduce(chain, next, reducer) : 
next;
-        }
-        return chain == null ? AsyncChains.success(null) : chain;
+        return mapReduce(context, snapshot -> 
commandStoreIds.mapToObj(snapshot::byId).iterator(), mapReduce);
     }
 
     public <O> Cancellable mapReduceConsume(PreLoadContext context, 
MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
@@ -900,8 +1030,8 @@ public abstract class CommandStores
     public CommandStore select(Participants<?> participants)
     {
         Snapshot snapshot = current;
-        StoreSelector selector = StoreSelector.select(snapshot, participants);
-        int i = selector.firstSetBit();
+        StoreFinder stores = StoreFinder.find(snapshot, participants);
+        int i = stores.firstSetBit();
         if (i < 0) return any();
         return snapshot.shards[i].store;
     }
@@ -938,7 +1068,7 @@ public abstract class CommandStores
     public CommandStore forId(int id)
     {
         Snapshot snapshot = current;
-        return snapshot.byId.get(id);
+        return snapshot.shards[snapshot.byId.get(id)].store;
     }
 
     public int[] ids()
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index f0cd7360..25ac1688 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -59,6 +59,7 @@ import accord.coordinate.CoordinationAdapter.Factory.Kind;
 import accord.coordinate.Infer.InvalidIf;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
+import accord.local.CommandStores.StoreSelector;
 import accord.local.durability.DurabilityService;
 import accord.messages.Callback;
 import accord.messages.Reply;
@@ -522,6 +523,11 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         return commandStores.forEach(context, unseekables, minEpoch, maxEpoch, 
forEach);
     }
 
+    public AsyncChain<Void> forEachLocal(PreLoadContext context, StoreSelector 
selector, Consumer<SafeCommandStore> forEach)
+    {
+        return commandStores.forEach(context, selector, forEach);
+    }
+
     public AsyncChain<Void> forEachLocalSince(PreLoadContext context, 
Unseekables<?> unseekables, Timestamp since, Consumer<SafeCommandStore> forEach)
     {
         return commandStores.forEach(context, unseekables, since.epoch(), 
Long.MAX_VALUE, forEach);
@@ -552,6 +558,11 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         return commandStores.mapReduceConsume(context, keys, minEpoch, 
maxEpoch, mapReduceConsume);
     }
 
+    public <T> Cancellable mapReduceConsumeLocal(PreLoadContext context, 
StoreSelector selector, MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
+    {
+        return commandStores.mapReduceConsume(context, selector, 
mapReduceConsume);
+    }
+
     public <T> Cancellable mapReduceConsumeAllLocal(PreLoadContext context, 
MapReduceConsume<SafeCommandStore, T> mapReduceConsume)
     {
         return commandStores.mapReduceConsume(context, mapReduceConsume);
@@ -856,7 +867,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         }
     }
 
-    public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf 
invalidIf, FullRoute<?> route, long reportLowEpoch, long reportHighEpoch)
+    public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf 
invalidIf, FullRoute<?> route, StoreSelector reportTo)
     {
         {
             AsyncResult<? extends Outcome> result = coordinating.get(txnId);
@@ -866,7 +877,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
         AsyncResult<Outcome> result = withEpochExact(txnId.epoch(), () -> {
             RecoverFuture<Outcome> future = new RecoverFuture<>();
-            RecoverWithRoute.recover(this, txnId, invalidIf, route, null, 
reportLowEpoch, reportHighEpoch, future);
+            RecoverWithRoute.recover(this, txnId, invalidIf, route, null, 
reportTo, future);
             return future;
         }).beginAsResult();
         coordinating.putIfAbsent(txnId, result);
diff --git a/accord-core/src/main/java/accord/local/StoreParticipants.java 
b/accord-core/src/main/java/accord/local/StoreParticipants.java
index 5f286beb..b401d674 100644
--- a/accord-core/src/main/java/accord/local/StoreParticipants.java
+++ b/accord-core/src/main/java/accord/local/StoreParticipants.java
@@ -423,7 +423,7 @@ public class StoreParticipants
 
     public final StoreParticipants supplement(Route<?> route)
     {
-        route = Route.merge(this.route(), (Route)route);
+        route = this.route != null ? this.route.with((Route)route) : 
route.with((Participants) owns);
         if (route == this.route()) return this;
         return create(route, owns(), executes(), waitsOn(), touches(), 
hasTouched());
     }
@@ -688,7 +688,7 @@ public class StoreParticipants
 
     private static long computeCoveringEpoch(SafeCommandStore safeStore, long 
txnIdEpoch, Participants<?> participants)
     {
-        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(safeStore, txnIdEpoch, 
participants);
+        long lowEpoch = 
safeStore.ranges().latestEarlierEpochThatFullyCovers(txnIdEpoch, participants);
         return Math.min(lowEpoch, txnIdEpoch);
     }
 }
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 9f209e52..8c28a445 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -317,7 +317,7 @@ public class DurabilityQueue
         long retryDelay = node.agent().retryDurabilityDelay(node, attempt, 
MICROSECONDS);
         Invariants.require(retryDelay > 0);
         if (request != null) logger.info("{}: Retrying durability for {} 
requested by {} in {}s.", syncPoint.syncId, syncPoint.route.toRanges(), 
request.requestedBy, String.format("%.3f", retryDelay/1000_000.0));
-        else logger.debug("{}: Retrying durability for {} in {}.", 
syncPoint.syncId, syncPoint.route.toRanges(), String.format("%.3f", 
retryDelay/1000_000.0));
+        else logger.debug("{}: Retrying durability for {} in {}s.", 
syncPoint.syncId, syncPoint.route.toRanges(), String.format("%.3f", 
retryDelay/1000_000.0));
         node.scheduler().selfRecurring(() -> submit(syncPoint, request, 
attempt), retryDelay, MICROSECONDS);
     }
 
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java 
b/accord-core/src/main/java/accord/messages/CheckStatus.java
index 70d0e10c..688b2f91 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -321,7 +321,7 @@ public class CheckStatus extends 
AbstractRequest<CheckStatus.CheckStatusReply>
             return null;
         }
 
-        public CheckStatusOk finish(Unseekables<?> queried, Unseekables<?> 
propagatingTo, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, 
InvalidIf previouslyKnownToBeInvalidIf)
+        public CheckStatusOk finish(Unseekables<?> queried, Unseekables<?> 
requestedFor, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, 
InvalidIf previouslyKnownToBeInvalidIf)
         {
             CheckStatusOk finished = this;
             if (withQuorum == HasQuorum)
@@ -343,7 +343,7 @@ public class CheckStatus extends 
AbstractRequest<CheckStatus.CheckStatusReply>
             }
 
             Known validForAll = map.computeValidForAll(routeOrParticipants);
-            if (withQuorum == HasQuorum && (invalidIf == IfUncommitted || 
previouslyKnownToBeInvalidIf == IfUncommitted) && 
queried.containsAll(propagatingTo))
+            if (withQuorum == HasQuorum && (invalidIf == IfUncommitted || 
previouslyKnownToBeInvalidIf == IfUncommitted) && 
queried.containsAll(requestedFor))
             {
                 Known minKnown = finished.minMaxKnown(queried), maxKnown = 
finished.maxKnown(queried);
                 InvalidIf invalidIf = this.invalidIf.inferWithQuorum(minKnown, 
maxKnown);
@@ -534,9 +534,9 @@ public class CheckStatus extends 
AbstractRequest<CheckStatus.CheckStatusReply>
             this.result = result;
         }
 
-        public CheckStatusOkFull finish(Unseekables<?> queried, Unseekables<?> 
propagatingTo, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, 
InvalidIf previouslyKnownToBeInvalidIf)
+        public CheckStatusOkFull finish(Unseekables<?> queried, Unseekables<?> 
requestedFor, Unseekables<?> routeOrParticipants, WithQuorum withQuorum, 
InvalidIf previouslyKnownToBeInvalidIf)
         {
-            return (CheckStatusOkFull) super.finish(queried, propagatingTo, 
routeOrParticipants, withQuorum, previouslyKnownToBeInvalidIf);
+            return (CheckStatusOkFull) super.finish(queried, requestedFor, 
routeOrParticipants, withQuorum, previouslyKnownToBeInvalidIf);
         }
 
         public CheckStatusOkFull merge(@Nonnull Route<?> route)
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index fd77d790..3b02d3ae 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -24,6 +24,8 @@ import accord.coordinate.FetchData.FetchResult;
 import accord.coordinate.Infer.InvalidIf;
 import accord.local.Cleanup;
 import accord.local.Command;
+import accord.local.CommandStores.LatentStoreSelector;
+import accord.local.CommandStores.StoreSelector;
 import accord.local.Commands;
 import accord.local.Node;
 import accord.local.PreLoadContext;
@@ -80,7 +82,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     final Node node;
     final TxnId txnId;
     final Route<?> route;
-    final Unseekables<?> propagateTo;
+    final Unseekables<?> requestedFor;
     final Known target;
     final InvalidIf invalidIf;
 
@@ -96,7 +98,6 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     final WithQuorum withQuorum;
     @Nullable final PartialTxn partialTxn;
     @Nullable final PartialDeps stableDeps;
-    final long lowEpoch, highEpoch;
     @Nullable final Timestamp committedExecuteAt;
     @Nullable final Writes writes;
     @Nullable final Result result;
@@ -108,7 +109,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     Propagate(
     Node node, TxnId txnId,
     Route<?> route,
-    Unseekables<?> propagateTo, Known target, InvalidIf invalidIf,
+    Unseekables<?> requestedFor, Known target, InvalidIf invalidIf,
     SaveStatus maxKnowledgeSaveStatus,
     SaveStatus maxSaveStatus, Ballot promised,
     Ballot acceptedOrCommitted,
@@ -117,8 +118,6 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     KnownMap known, WithQuorum withQuorum,
     @Nullable PartialTxn partialTxn,
     @Nullable PartialDeps stableDeps,
-    long lowEpoch,
-    long highEpoch,
     @Nullable Timestamp committedExecuteAt,
     @Nullable Writes writes,
     @Nullable Result result,
@@ -127,7 +126,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         this.node = node;
         this.txnId = txnId;
         this.route = route;
-        this.propagateTo = propagateTo;
+        this.requestedFor = requestedFor;
         this.target = target;
         this.invalidIf = invalidIf;
         this.maxKnowledgeSaveStatus = maxKnowledgeSaveStatus;
@@ -140,19 +139,17 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         this.withQuorum = withQuorum;
         this.partialTxn = partialTxn;
         this.stableDeps = stableDeps;
-        this.lowEpoch = lowEpoch;
-        this.highEpoch = highEpoch;
         this.committedExecuteAt = committedExecuteAt;
         this.writes = writes;
         this.result = result;
         this.callback = callback;
     }
 
-    public static void propagate(Node node, TxnId txnId, InvalidIf 
previouslyKnownToBeInvalidIf, long sourceEpoch, long lowEpoch, long highEpoch, 
WithQuorum withQuorum, Route<?> queried, Unseekables<?> propagateTo, @Nullable 
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, 
Throwable> callback)
+    public static void propagate(Node node, TxnId txnId, InvalidIf 
previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?> 
queried, Participants<?> requestedFor, LatentStoreSelector reportTo, @Nullable 
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, 
Throwable> callback)
     {
         if (full.maxKnowledgeSaveStatus.status == NotDefined && full.invalidIf 
== NotKnownToBeInvalid)
         {
-            callback.accept(new FetchResult(Nothing, propagateTo.slice(0, 0), 
propagateTo), null);
+            callback.accept(new FetchResult(Nothing, requestedFor.slice(0, 0), 
requestedFor), null);
             return;
         }
 
@@ -160,18 +157,19 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
 
         // TODO (required): consider and document whether it is safe to infer 
that we are stale if we have not received responses from all shards we know of
         //  (in principle, we should at least require responses from our own 
shard, and the home shard if we know it); if we only hear from a remote shard 
it may have fully Erased
-        full = full.finish(queried, propagateTo, queried.with((Unseekables) 
propagateTo), withQuorum, previouslyKnownToBeInvalidIf);
+        full = full.finish(queried, requestedFor, queried.with((Unseekables) 
requestedFor), withQuorum, previouslyKnownToBeInvalidIf);
         Route<?> route = Invariants.nonNull(full.route);
 
+        Timestamp committedExecuteAt = full.executeAtIfKnown();
         Propagate propagate =
-            new Propagate(node, txnId, route, propagateTo, target, 
full.invalidIf, full.maxKnowledgeSaveStatus, full.maxSaveStatus, 
full.maxPromised, full.acceptedOrCommitted, full.durability, full.homeKey, 
full.map, withQuorum, full.partialTxn, full.stableDeps, lowEpoch, highEpoch, 
full.executeAtIfKnown(), full.writes, full.result, callback);
+            new Propagate(node, txnId, route, requestedFor, target, 
full.invalidIf, full.maxKnowledgeSaveStatus, full.maxSaveStatus, 
full.maxPromised, full.acceptedOrCommitted, full.durability, full.homeKey, 
full.map, withQuorum, full.partialTxn, full.stableDeps, committedExecuteAt, 
full.writes, full.result, callback);
 
-        if (full.executeAt != null && full.executeAt.epoch() > highEpoch)
-            highEpoch = full.executeAt.epoch();
-        long untilEpoch = full.executeAt == null ? highEpoch : 
Math.max(highEpoch, full.executeAt.epoch());
+        long untilEpoch = txnId.epoch();
+        if (committedExecuteAt != null)
+            untilEpoch = Math.max(untilEpoch, committedExecuteAt.epoch());
 
-        Route<?> finalRoute = queried;
-        node.withEpochAtLeast(highEpoch, propagate, () -> 
node.mapReduceConsumeLocal(propagate, finalRoute, lowEpoch, untilEpoch, 
propagate));
+        StoreSelector selector = reportTo.refine(txnId, committedExecuteAt, 
route);
+        node.withEpochAtLeast(untilEpoch, propagate, () -> 
node.mapReduceConsumeLocal(propagate, selector, propagate));
     }
 
     @Override
@@ -184,8 +182,8 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     public Void apply(SafeCommandStore safeStore)
     {
         long executeAtEpoch = committedExecuteAt == null ? txnId.epoch() : 
committedExecuteAt.epoch();
-        long lowEpoch = Math.min(this.lowEpoch, 
StoreParticipants.computePropagateLowEpoch(safeStore, txnId, route));
-        StoreParticipants participants = StoreParticipants.update(safeStore, 
route, lowEpoch, txnId, executeAtEpoch, highEpoch, committedExecuteAt != null);
+        long lowEpoch = StoreParticipants.computePropagateLowEpoch(safeStore, 
txnId, route);
+        StoreParticipants participants = StoreParticipants.update(safeStore, 
route, lowEpoch, txnId, executeAtEpoch, executeAtEpoch, committedExecuteAt != 
null);
         // TODO (expected): can we come up with a better more universal 
pattern for avoiding updating a command we don't intersect with?
         //   ideally integrated with safeStore.get()
         if (participants.owns().isEmpty() && safeStore.ifInitialised(txnId) == 
null)
@@ -198,7 +196,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         if (participants.executes() == null && executeAtIfKnown != null)
         {
             executeAtEpoch = executeAtIfKnown.epoch();
-            participants = StoreParticipants.update(safeStore, route, 
lowEpoch, txnId, executeAtEpoch, highEpoch, true);
+            participants = StoreParticipants.update(safeStore, route, 
lowEpoch, txnId, executeAtEpoch, executeAtEpoch, true);
         }
 
         switch (command.saveStatus().phase)
@@ -347,9 +345,9 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     {
         FetchResult current = fetchResult;
         if (current == null)
-            return new FetchResult(Nothing, propagateTo.slice(0, 0), 
propagateTo);
+            return new FetchResult(Nothing, requestedFor.slice(0, 0), 
requestedFor);
 
-        Unseekables<?> missed = propagateTo.without(current.achievedTarget);
+        Unseekables<?> missed = requestedFor.without(current.achievedTarget);
         if (missed.isEmpty())
             return current;
 
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncResult.java 
b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
index fae3bc9c..de4ccd6a 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncResult.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncResult.java
@@ -27,6 +27,8 @@ import static accord.utils.Invariants.illegalState;
 /**
  * Handle for async computations that supports multiple listeners and 
registering
  * listeners after the computation has started
+ *
+ * TODO (expected): by default AsyncResult methods should be started 
immediately; should introduce newChain() for building a chain.
  */
 public interface AsyncResult<V> extends AsyncChain<V>
 {
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index ce6f18e1..8ad08799 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -51,7 +51,7 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
 
     public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, 
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, 
Node> lookup, TopologyUpdates topologyUpdates)
     {
-        super(node);
+        super(node, executor.agent());
         this.executor = executor;
         this.randomSupplier = randomSupplier;
         this.lookup = lookup;
@@ -176,13 +176,9 @@ public class BurnTestConfigurationService extends 
AbstractConfigurationService.M
     {
         synchronized (this)
         {
-            if (epoch <= maxRequestedEpoch)
-                return;
-
-            maxRequestedEpoch = epoch;
+            while (maxRequestedEpoch < epoch)
+                pendingEpochs.computeIfAbsent(++maxRequestedEpoch, 
FetchTopology::new);
         }
-
-        pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
     }
 
     @Override
diff --git 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
index a26c2181..37fdc0cf 100644
--- 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
+++ 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import accord.api.Agent;
 import accord.api.ConfigurationService.EpochReady;
 import accord.primitives.Ranges;
 import accord.utils.SortedArrays.SortedArrayList;
@@ -130,9 +131,9 @@ public class AbstractConfigurationServiceTest
         final Set<Long> syncStarted = new HashSet<>();
         final Set<Long> epochsFetched = new HashSet<>();
 
-        public TestableConfigurationService(Id node)
+        public TestableConfigurationService(Id node, Agent agent)
         {
-            super(node);
+            super(node, agent);
         }
 
         @Override
@@ -206,7 +207,7 @@ public class AbstractConfigurationServiceTest
     @Test
     public void getTopologyTest()
     {
-        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1, new TestAgent());
         TestListener listener = new TestListener(service, false);
         service.registerListener(listener);
         service.reportTopology(TOPOLOGY1);
@@ -229,7 +230,7 @@ public class AbstractConfigurationServiceTest
     @Test
     public void loadAfterTruncate()
     {
-        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1, new TestAgent());
         TestListener listener = new TestListener(service, false);
         service.registerListener(listener);
         service.reportTopology(TOPOLOGY3);
@@ -248,7 +249,7 @@ public class AbstractConfigurationServiceTest
     @Test
     public void awaitOutOfOrderTopologies()
     {
-        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1, new TestAgent());
 
         TestListener listener = new TestListener(service, false);
         service.registerListener(listener);
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 9fa044f8..51f3c1b0 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -103,7 +103,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             Snapshot current = current();
             RangesForEpoch ranges = e.getValue();
             CommandStore commandStore = null;
-            for (ShardHolder shard : current.shards)
+            for (ShardHolder shard : current)
             {
                 if (shard.ranges().equals(ranges))
                     commandStore = shard.store;
@@ -122,10 +122,10 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
     {
         Snapshot current = current();
         // These checks are only applicable to delayed command stores.
-        for (Integer id : current.byId.keySet())
+        for (ShardHolder shard : current)
         {
-            CommandStore prev = current.byId.get(id);
-            CommandStore next = nextSnapshot.byId.get(id);
+            CommandStore prev = current.byId(shard.store.id());
+            CommandStore next = nextSnapshot.byId(shard.store.id());
             {
                 RedundantBefore orig = prev.unsafeGetRedundantBefore();
                 RedundantBefore loaded = next.unsafeGetRedundantBefore();
@@ -468,7 +468,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
     public List<DelayedCommandStore> unsafeStores()
     {
         List<DelayedCommandStore> stores = new ArrayList<>();
-        for (ShardHolder holder : current().shards)
+        for (ShardHolder holder : current())
             stores.add((DelayedCommandStore) holder.store);
         return stores;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to