This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 204e48df675ecdf6db1f687e802b2d0c86824d0d
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Jul 30 10:53:22 2025 +0100

    Improve ProgressLog:
     - Schedule a fallback timeout for active requests to ensure progress with 
lost callbacks
     - Clear active task for a txnId when new RunInvoker is registered
     - Consult DurableBefore prior to invoking recovery
     - Support user invoked reset of a command, clearing any active work and 
requeueing it
     - (Testing): Treat progress log for RX as recurring tasks for burn test 
termination
---
 .../src/main/java/accord/coordinate/FetchData.java |  25 ++-
 .../main/java/accord/coordinate/FetchRoute.java    |  10 +-
 .../main/java/accord/coordinate/MaybeRecover.java  |   8 +-
 .../accord/impl/progresslog/CallbackInvoker.java   |  49 +++--
 .../impl/progresslog/DefaultProgressLog.java       | 216 +++++++++++++--------
 .../java/accord/impl/progresslog/HomeState.java    |  20 +-
 .../java/accord/impl/progresslog/TxnState.java     |  14 +-
 .../java/accord/impl/progresslog/WaitingState.java |  23 ++-
 accord-core/src/main/java/accord/local/Node.java   |  10 +-
 .../accord/local/durability/DurabilityQueue.java   |   5 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |   4 +-
 .../src/test/java/accord/impl/basic/Pending.java   |   5 +
 .../java/accord/impl/basic/TestProgressLogs.java   |  29 ++-
 13 files changed, 260 insertions(+), 158 deletions(-)

diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java 
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index dc75c4fe..9b3374a5 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -73,11 +73,11 @@ public class FetchData extends CheckShards<Route<?>>
         final long srcEpoch;
         // known participants, a subset of which we may fetch from
         final Participants<?> contactable;
-        final StoreSelector reportTo;
+        final LatentStoreSelector reportTo;
         final BiConsumer<? super FetchResult, Throwable> callback;
         final @Nullable Tracing tracing;
 
-        public FetchRequest(SequentialAsyncExecutor executor, Known fetch, 
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, 
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback, @Nullable Tracing tracing)
+        public FetchRequest(SequentialAsyncExecutor executor, Known fetch, 
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, 
Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback, @Nullable Tracing tracing)
         {
             this.executor = executor;
             this.fetch = fetch;
@@ -95,29 +95,26 @@ public class FetchData extends CheckShards<Route<?>>
     /**
      * Do not make an attempt to discern what keys need to be contacted; fetch 
from only the specific remote keys that were requested.
      */
-    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, StoreSelector 
reportTo, BiConsumer<? super FetchResult, Throwable> callback)
+    public static Object fetchSpecific(Known fetch, Node node, TxnId txnId, 
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, 
LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> 
callback)
     {
-        fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, 
query, maxRoute, reportTo, callback);
+        return fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, 
executeAt, query, maxRoute, reportTo, callback);
     }
 
     /**
      * Do not make an attempt to discern what keys need to be contacted; fetch 
from only the specific remote keys that were requested.
      */
-    public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> 
maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> 
callback)
+    public static Object fetchSpecific(Known fetch, Node node, TxnId txnId, 
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> 
maxRoute, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, 
Throwable> callback)
     {
-        fetchSpecific(node, query, maxRoute, new 
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, 
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
+        return fetchSpecific(node, query, maxRoute, new 
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, 
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
     }
 
-    public static void fetchSpecific(Node node, Route<?> query, Route<?> 
maxRoute, FetchRequest request)
+    public static Object fetchSpecific(Node node, Route<?> query, Route<?> 
maxRoute, FetchRequest request)
     {
         long srcEpoch = request.srcEpoch;
         if (!node.topology().hasAtLeastEpoch(srcEpoch))
-        {
-            node.withEpochAtLeast(srcEpoch, request.executor, 
request.callback, () -> fetchSpecific(node, query, maxRoute, request));
-            return;
-        }
+            return node.withEpochAtLeast(srcEpoch, request.executor, 
request.callback, () -> fetchSpecific(node, query, maxRoute, request));
 
-        fetchData(node, query, maxRoute, request);
+        return fetchData(node, query, maxRoute, request);
     }
 
     final BiConsumer<? super FetchResult, Throwable> callback;
@@ -131,12 +128,12 @@ public class FetchData extends CheckShards<Route<?>>
     // (i.e. if preaccept/accept contact a later epoch than execution is 
decided for)
     final LatentStoreSelector reportTo;
 
-    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch, StoreSelector 
reportTo, BiConsumer<? super FetchResult, Throwable> callback)
+    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch, 
LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> 
callback)
     {
         this(node, target, txnId, invalidIf, route, route.withHomeKey(), 
maxRoute, sourceEpoch, reportTo, callback);
     }
 
-    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long 
sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> 
callback)
+    private FetchData(Node node, Known target, TxnId txnId, InvalidIf 
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long 
sourceEpoch, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, 
Throwable> callback)
     {
         // TODO (desired, efficiency): restore behaviour of only collecting 
info if e.g. Committed or Executed
         super(node, node.someSequentialExecutor(), txnId, routeWithHomeKey, 
sourceEpoch, CheckStatus.IncludeInfo.All, null, invalidIf);
diff --git a/accord-core/src/main/java/accord/coordinate/FetchRoute.java 
b/accord-core/src/main/java/accord/coordinate/FetchRoute.java
index 7a4f149b..0f03b75e 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchRoute.java
@@ -59,23 +59,23 @@ public class FetchRoute extends CheckShards<Participants<?>>
         this.callback = callback;
     }
 
-    public static void fetchRoute(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, 
BiConsumer<Route<?>, Throwable> callback, Tracing tracing)
+    public static Object fetchRoute(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, 
BiConsumer<Route<?>, Throwable> callback, Tracing tracing)
     {
         if (!node.topology().hasEpoch(txnId.epoch()))
         {
             if (tracing != null)
                 tracing.trace(null, "Waiting for epoch %d", txnId.epoch());
-            node.withEpochAtLeast(txnId.epoch(), null, callback, () -> 
fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing));
-            return;
+            return node.withEpochAtLeast(txnId.epoch(), null, callback, () -> 
fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing));
         }
 
         FetchRoute fetchRoute = new FetchRoute(node, txnId, invalidIf, 
unseekables, reportTo, callback, tracing);
         fetchRoute.start();
+        return fetchRoute;
     }
 
-    public static void fetchRoute(Node node, TxnId txnId, Participants<?> 
contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> 
callback)
+    public static Object fetchRoute(Node node, TxnId txnId, Participants<?> 
contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> 
callback)
     {
-        fetchRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo, 
callback, node.agent().trace(txnId, FETCH));
+        return fetchRoute(node, txnId, NotKnownToBeInvalid, contactable, 
reportTo, callback, node.agent().trace(txnId, FETCH));
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java 
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 5c1fc3bf..0a04519a 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -21,6 +21,8 @@ package accord.coordinate;
 import java.util.function.BiConsumer;
 
 import accord.api.Tracing;
+import accord.local.CommandStores;
+import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.CommandStores.StoreSelector;
 import accord.local.SequentialAsyncExecutor;
 import accord.messages.InformDurable;
@@ -45,9 +47,9 @@ public class MaybeRecover extends CheckShards<Route<?>>
 {
     final ProgressToken prevProgress;
     final BiConsumer<Outcome, Throwable> callback;
-    final StoreSelector reportTo;
+    final LatentStoreSelector reportTo;
 
-    MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId, 
Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, 
StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
+    MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId, 
Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, 
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         // we only want to enquire with the home shard, but we prefer maximal 
route information for running Invalidation against, if necessary
         super(node, executor, txnId, someRoute.withHomeKey(), 
IncludeInfo.Route, null, invalidIf, node.agent().trace(txnId, RECOVER));
@@ -56,7 +58,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
         this.reportTo = reportTo;
     }
 
-    public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector 
reportTo, BiConsumer<Outcome, Throwable> callback)
+    public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf 
invalidIf, Route<?> someRoute, ProgressToken prevProgress, LatentStoreSelector 
reportTo, BiConsumer<Outcome, Throwable> callback)
     {
         MaybeRecover maybeRecover = new MaybeRecover(node, 
node.someSequentialExecutor(), txnId, invalidIf, someRoute, prevProgress, 
reportTo, callback);
         maybeRecover.start();
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java 
b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
index 4667846b..80d50fb9 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
@@ -24,12 +24,13 @@ import javax.annotation.Nullable;
 
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
 import accord.primitives.TxnId;
 
 import static accord.impl.progresslog.TxnStateKind.Home;
 import static accord.impl.progresslog.TxnStateKind.Waiting;
 
-class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext
+final class CallbackInvoker<P, V> extends DefaultProgressLog.PendingTask 
implements BiConsumer<V, Throwable>, PreLoadContext
 {
     static <P, V> CallbackInvoker<P, V> 
invokeWaitingCallback(DefaultProgressLog instance, TxnId txnId, P param, 
Callback<P, V> callback)
     {
@@ -43,12 +44,11 @@ class CallbackInvoker<P, V> implements BiConsumer<V, 
Throwable>, PreLoadContext
 
     static <P, V> CallbackInvoker<P, V> invokeCallback(TxnStateKind kind, 
DefaultProgressLog owner, TxnId txnId, P param, Callback<P, V> callback)
     {
-        CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind, 
owner.nextInvokerId(), txnId, param, callback);
+        CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind, 
owner.nextCallbackId(), txnId, param, callback);
         owner.registerPending(kind, txnId, invoker);
         return invoker;
     }
 
-    final DefaultProgressLog owner;
     final boolean isHome;
     final long id;
     final TxnId txnId;
@@ -57,7 +57,7 @@ class CallbackInvoker<P, V> implements BiConsumer<V, 
Throwable>, PreLoadContext
 
     CallbackInvoker(DefaultProgressLog owner, TxnStateKind kind, long id, 
TxnId txnId, P param, Callback<P, V> callback)
     {
-        this.owner = owner;
+        super(owner);
         this.isHome = kind == Home;
         this.id = id;
         this.txnId = txnId;
@@ -70,37 +70,32 @@ class CallbackInvoker<P, V> implements BiConsumer<V, 
Throwable>, PreLoadContext
         return isHome ? Home : Waiting;
     }
 
-    @Override
-    public void accept(V success, Throwable fail)
+    private boolean complete()
     {
-        owner.commandStore.execute(this, safeStore -> {
-
-            // we load safeCommand first so that if it clears the progress log 
we abandon the callback
-            SafeCommand safeCommand = safeStore.ifInitialised(txnId);
-            if (!owner.complete(safeStore, kind(), id, this))
-                return;
-
-            if (safeCommand == null)
-                return;
-
-            callback.callback(safeStore, safeCommand, owner, txnId, param, 
success, fail);
-        }, owner.commandStore.agent());
+        return owner.complete(kind(), id, txnId, this);
     }
 
     @Override
-    public boolean equals(Object obj)
+    public void accept(V success, Throwable fail)
     {
-        if (obj == null) return false;
-        if (obj.getClass() == TxnId.class) return txnId.equals(obj);
-        if (obj.getClass() != getClass()) return false;
-        CallbackInvoker<?, ?> that = (CallbackInvoker<?, ?>) obj;
-        return id == that.id && callback == that.callback;
+        owner.commandStore.execute(this, safeStore -> {
+            try
+            {
+                // we load safeCommand first so that if it clears the progress 
log we abandon the callback
+                SafeCommand safeCommand = safeStore.ifInitialised(txnId);
+                if (complete() && safeCommand != null)
+                    acceptInternal(safeStore, safeCommand, success, fail);
+            }
+            finally
+            {
+                postRun(safeStore);
+            }
+        }, owner.commandStore.agent());
     }
 
-    @Override
-    public int hashCode()
+    private void acceptInternal(SafeCommandStore safeStore, SafeCommand 
safeCommand, V success, Throwable fail)
     {
-        return txnId.hashCode();
+        callback.callback(safeStore, safeCommand, owner, txnId, param, 
success, fail);
     }
 
     @Override
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 450ff5fc..518a283f 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -18,6 +18,7 @@
 
 package accord.impl.progresslog;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -28,6 +29,9 @@ import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
 import accord.local.Command;
@@ -52,14 +56,12 @@ import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
 import org.agrona.collections.Long2ObjectHashMap;
 import org.agrona.collections.Object2ObjectHashMap;
-import org.agrona.collections.ObjectHashSet;
 
 import static accord.api.ProgressLog.BlockedUntil.CanApply;
 import static accord.api.ProgressLog.BlockedUntil.NotBlocked;
 import static accord.impl.progresslog.CoordinatePhase.Decided;
 import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
 import static accord.impl.progresslog.CoordinatePhase.Undecided;
-import static accord.impl.progresslog.Progress.Awaiting;
 import static accord.impl.progresslog.Progress.NoneExpected;
 import static accord.impl.progresslog.Progress.Querying;
 import static accord.impl.progresslog.Progress.Queued;
@@ -79,6 +81,29 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
 // TODO (desired): evict to disk
 public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStore>
 {
+    static abstract class PendingTask
+    {
+        final DefaultProgressLog owner;
+
+        PendingTask(DefaultProgressLog owner)
+        {
+            this.owner = owner;
+        }
+
+        void postRun(SafeCommandStore safeStore)
+        {
+            owner.acceptIfNonEmptyRunBuffer(safeStore);
+        }
+    }
+
+    public static class Config
+    {
+        public int concurrency = 8;
+        public Duration maxActiveRunTime = Duration.ofMinutes(1);
+    }
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DefaultProgressLog.class);
+
     final Node node;
     final CommandStore commandStore;
 
@@ -94,8 +119,9 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
      * These callbacks are required to have hashCode() == txnId.hashCode() and 
equals(txnId) == true,
      * so that we can manage overriding callbacks on the relevant TxnState.
      */
-    private final ObjectHashSet<Object> pendingWaiting = new ObjectHashSet<>();
-    private final ObjectHashSet<Object> pendingHome = new ObjectHashSet<>();
+    // TODO (desired): replace this with a set that can lookup the matching 
item
+    private final Object2ObjectHashMap<TxnId, PendingTask> pendingWaiting = 
new Object2ObjectHashMap<>();
+    private final Object2ObjectHashMap<TxnId, PendingTask> pendingHome = new 
Object2ObjectHashMap<>();
 
     private final Long2ObjectHashMap<Object> active = new 
Long2ObjectHashMap<>();
     private final Map<TxnId, StackTraceElement[]> debugDeleted = 
Invariants.debug() ? new Object2ObjectHashMap<>() : null;
@@ -113,9 +139,9 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     private boolean processing;
 
     private volatile boolean stopped;
-    private int maxConcurrency = 128;
+    private Config config = new Config();
 
-    private long nextInvokerId;
+    private long nextCallbackId;
 
     protected DefaultProgressLog(Node node, CommandStore commandStore)
     {
@@ -234,7 +260,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
                 // fall-through to default handler, which simply postpones any 
scheduled coordination attempt if we witness another coordination attempt in 
the meantime
                 if (state.homeProgress() == Queued && (before == null ? 
after.promised().compareTo(Ballot.ZERO) > 0 : 
(after.promised().compareTo(before.promised()) > 0) || 
after.acceptedOrCommitted().compareTo(before.acceptedOrCommitted()) > 0))
                 {
-                    clearPending(Home, state.txnId);
+                    clearPendingAndActive(Home, state.txnId);
                     state.home().set(safeStore, this, state.phase(), Queued);
                 }
                 break;
@@ -494,6 +520,12 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
             state.setInvalidIfUncommitted();
     }
 
+    void acceptIfNonEmptyRunBuffer(SafeCommandStore safeStore)
+    {
+        if (runBufferIndex < runBufferCount)
+            accept(safeStore);
+    }
+
     @Override
     public void accept(@Nullable SafeCommandStore safeStore)
     {
@@ -592,6 +624,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
                 run.clearPendingTimerDelay();
                 if (pendingTimerDeadline <= nowMicros)
                 {
+                    validatePreRunState(run, runKind.other());
                     invokeBoth = true;
                 }
                 else
@@ -640,7 +673,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     {
         while (runBufferIndex < runBufferCount)
         {
-            if (active.size() >= maxConcurrency)
+            if (active.size() >= config.concurrency)
             {
                 maybeShrinkRunBuffer();
                 return;
@@ -668,77 +701,60 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     private void validatePreRunState(TxnState run, TxnStateKind kind)
     {
         Progress progress = kind == Waiting ? run.waiting().waitingProgress() 
: run.home().homeProgress();
-        Invariants.require(progress != NoneExpected && progress != Querying);
+        Invariants.require(progress != NoneExpected);
+        if (progress == Querying)
+        {
+            // TODO (expected): add debug information about the active task
+            logger.warn("Interrupting query for {} ({}) as fallback timeout 
exceeded", run.txnId, kind);
+            clearPendingAndActive(kind, run.txnId);
+        }
     }
 
     RunInvoker invoker(TxnState run, TxnStateKind runKind)
     {
-        RunInvoker invoker = new RunInvoker(nextInvokerId(), run, runKind);
+        RunInvoker invoker = new RunInvoker(this, run, runKind);
         registerPending(runKind, run.txnId, invoker);
         return invoker;
     }
 
-    class RunInvoker implements Consumer<SafeCommandStore>, PreLoadContext
+    static final class RunInvoker extends PendingTask implements 
PreLoadContext, Consumer<SafeCommandStore>
     {
-        final long id;
+        final DefaultProgressLog owner;
         final TxnState run;
         final TxnStateKind runKind;
 
-        RunInvoker(long id, TxnState run, TxnStateKind runKind)
+        RunInvoker(DefaultProgressLog owner, TxnState run, TxnStateKind 
runKind)
         {
-            this.id = id;
+            super(owner);
+            this.owner = owner;
             this.run = run;
             this.runKind = runKind;
         }
 
-        @Override
-        public void accept(SafeCommandStore safeStore)
+        private boolean complete()
         {
-            // we have to read safeCommand first as it may become truncated on 
load, which may clear the progress log and invalidate us
-            SafeCommand safeCommand = safeStore.ifInitialised(run.txnId);
-            if (safeCommand == null)
-                return;
-
-            if (!complete(safeStore, runKind, id, this))
-                return; // we've been cancelled
-
-            // check this after fetching SafeCommand, as doing so can erase 
the command (and invalidate our state)
-            if (run.isDone(runKind))
-                return;
-
-            Invariants.require(get(run.txnId) == run, "Transaction state for 
%s does not match expected one %s", run.txnId, run);
-            Invariants.require(run.scheduledTimer() != runKind, "We are 
actively executing %s, but we are also scheduled to run this same TxnState 
later. This should not happen.", runKind);
-            Invariants.require(run.pendingTimer() != runKind, "We are actively 
executing %s, but we also have a pending scheduled task to run this same 
TxnState later. This should not happen.", runKind);
-
-            validatePreRunState(run, runKind);
-            if (runKind == Home)
-            {
-                boolean isRetry = run.homeProgress() == Awaiting;
-                if (isRetry) run.incrementHomeRetryCounter();
-                run.home().runHome(DefaultProgressLog.this, safeStore, 
safeCommand);
-            }
-            else
-            {
-                boolean isRetry = run.waitingProgress() == Awaiting;
-                if (isRetry) run.incrementWaitingRetryCounter();
-                run.runWaiting(safeStore, safeCommand, 
DefaultProgressLog.this);
-            }
+            return owner.complete(runKind, run.txnId, this);
         }
 
-        @Override
-        public boolean equals(Object obj)
+        private void acceptInternal(SafeCommandStore safeStore, SafeCommand 
safeCommand)
         {
-            if (obj == null) return false;
-            if (obj.getClass() == TxnId.class) return run.txnId.equals(obj);
-            if (obj.getClass() != getClass()) return false;
-            RunInvoker that = (RunInvoker) obj;
-            return id == that.id && run.txnId.equals(that.run.txnId) && 
runKind.equals(that.runKind);
+            owner.run(runKind, run, safeStore, safeCommand);
         }
 
         @Override
-        public int hashCode()
+        public void accept(SafeCommandStore safeStore)
         {
-            return run.txnId.hashCode();
+            try
+            {
+                // we load safeCommand first so that if it clears the progress 
log we abandon the callback
+                SafeCommand safeCommand = safeStore.ifInitialised(run.txnId);
+                if (complete() && safeCommand != null)
+                    acceptInternal(safeStore, safeCommand);
+            }
+            finally
+            {
+                postRun(safeStore);
+            }
         }
 
         @Override
@@ -754,45 +770,86 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         }
     }
 
-    long nextInvokerId()
+    protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore 
safeStore, SafeCommand safeCommand)
+    {
+        // check this after fetching SafeCommand, as doing so can erase the 
command (and invalidate our state)
+        if (run.isDone(runKind))
+            return;
+
+        Invariants.require(get(run.txnId) == run, "Transaction state for %s 
does not match expected one %s", run.txnId, run);
+        Invariants.require(run.scheduledTimer() != runKind, "We are actively 
executing %s, but we are also scheduled to run this same TxnState later. This 
should not happen.", runKind);
+        Invariants.require(run.pendingTimer() != runKind, "We are actively 
executing %s, but we also have a pending scheduled task to run this same 
TxnState later. This should not happen.", runKind);
+
+        validatePreRunState(run, runKind);
+        if (runKind == Home)
+        {
+            boolean isRetry = run.homeProgress() != Queued;
+            if (isRetry) run.incrementHomeRetryCounter();
+            run.runHome(DefaultProgressLog.this, safeStore, safeCommand);
+        }
+        else
+        {
+            boolean isRetry = run.waitingProgress() != Queued;
+            if (isRetry) run.incrementWaitingRetryCounter();
+            run.runWaiting(DefaultProgressLog.this, safeStore, safeCommand);
+        }
+    }
+
+    long nextCallbackId()
     {
-        return nextInvokerId++;
+        return ++nextCallbackId;
     }
 
-    ObjectHashSet<Object> pending(TxnStateKind kind)
+    Object2ObjectHashMap<TxnId, PendingTask> pending(TxnStateKind kind)
     {
         return kind == Waiting ? pendingWaiting : pendingHome;
     }
 
-    void registerPending(TxnStateKind kind, TxnId txnId, Object object)
+    void registerPending(TxnStateKind kind, TxnId txnId, PendingTask register)
     {
-        ObjectHashSet<Object> pending = pending(kind);
-        Invariants.require(!pending.contains(txnId));
-        pending.add(object);
+        Object2ObjectHashMap<TxnId, PendingTask> collection = pending(kind);
+        PendingTask existing = collection.putIfAbsent(txnId, register);
+        Invariants.require(existing == null);
     }
 
     boolean hasPending(TxnStateKind kind, TxnId txnId)
     {
-        return pending(kind).contains(txnId);
+        return pending(kind).containsKey(txnId);
     }
 
-    void start(CallbackInvoker<?, ?> invoker, Object task)
+    void start(CallbackInvoker<?, ?> invoker, Object debug)
     {
-        active.put(invoker.id, task);
+        // task is an arbitrary object to help debug, but must be non-null
+        // TODO (expected): make active debuggable via virtual table or other 
mechanism
+        if (debug == null)
+            debug = invoker;
+        active.put(invoker.id, debug);
     }
 
-    boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id, 
Object active)
+    boolean complete(TxnStateKind kind, long id, TxnId txnId, PendingTask 
completing)
     {
-        this.active.remove(id);
-        boolean result = pending(kind).remove(active);
-        if (runBufferIndex < runBufferCount)
-            accept(safeStore);
-        return result;
+        boolean stillActive = active.remove(id) != null;
+        return complete(kind, txnId, completing) && stillActive;
+    }
+
+    boolean complete(TxnStateKind kind, TxnId txnId, PendingTask completing)
+    {
+        return pending(kind).remove(txnId, completing);
+    }
+
+    void clearPendingAndActive(TxnStateKind kind, TxnId txnId)
+    {
+        PendingTask pending = pending(kind).remove(txnId);
+        if (pending instanceof CallbackInvoker<?,?>)
+            active.remove(((CallbackInvoker<?, ?>) pending).id);
     }
 
-    void clearPending(TxnStateKind kind, TxnId txnId)
+    public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId 
txnId)
     {
-        pending(kind).remove(txnId);
+        clearPendingAndActive(kind, txnId);
+        TxnState state = get(txnId);
+        if (state != null && (kind == Home ? state.isHomeInitialised() : 
!state.isWaitingDone()))
+            state.updateScheduling(safeStore, this, kind, null, Queued);
     }
 
     void unschedule(TxnState state)
@@ -826,15 +883,20 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         }
     }
 
-    public void setMaxConcurrency(int maxConcurrency)
+    public Config config()
+    {
+        return config;
+    }
+
+    public void setConfig(SafeCommandStore safeStore, Config config)
     {
-        Invariants.requireArgument(maxConcurrency >= 1);
-        this.maxConcurrency = maxConcurrency;
+        Invariants.require(commandStore.inStore());
+        this.config = config;
     }
 
-    public int maxConcurrency()
+    public void unsafeSetConfig(Config config)
     {
-        return maxConcurrency;
+        this.config = config;
     }
 
     public int size()
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java 
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 381207b2..9179dbaa 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -30,6 +30,8 @@ import 
accord.local.CommandStores.IncludingSpecificStoreSelector;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.primitives.ProgressToken;
+import accord.primitives.Route;
+import accord.primitives.Status;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 
@@ -37,7 +39,6 @@ import static 
accord.api.ProgressLog.BlockedUntil.CanCoordinateExecution;
 import static accord.api.TraceEventType.HOME_PROGRESS;
 import static accord.impl.progresslog.CallbackInvoker.invokeHomeCallback;
 import static accord.impl.progresslog.CoordinatePhase.Done;
-import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
 import static accord.impl.progresslog.Progress.NoneExpected;
 import static accord.impl.progresslog.Progress.Querying;
 import static accord.impl.progresslog.Progress.Queued;
@@ -129,7 +130,7 @@ abstract class HomeState extends WaitingState
 
         if (newPhase.compareTo(phase()) > 0)
         {
-            instance.clearPending(Home, txnId);
+            instance.clearPendingAndActive(Home, txnId);
             clearHomeRetryCounter();
             set(safeStore, instance, newPhase, newProgress);
         }
@@ -137,7 +138,7 @@ abstract class HomeState extends WaitingState
 
     final void runHome(DefaultProgressLog instance, SafeCommandStore 
safeStore, SafeCommand safeCommand)
     {
-        Tracing tracing = instance.node.agent().trace(safeCommand.txnId(), 
HOME_PROGRESS);
+        Tracing tracing = instance.node.agent().trace(txnId, HOME_PROGRESS);
         Invariants.require(!isHomeDoneOrUninitialised());
         Command command = safeCommand.current();
         // note: we may truncate locally based on shard-specific criteria, but 
this doesn't mean we're globally persisted
@@ -146,6 +147,17 @@ abstract class HomeState extends WaitingState
         // TODO (expected): when invalidated, safer to maintain HomeState 
until known to be globally invalidated
         // TODO (expected): validate that we clear HomeState when we receive a 
Durable reply, to replace the token check logic
         Invariants.require(!command.durability().isDurableOrInvalidated(), 
"Command is durable or invalidated, but we have not cleared the ProgressLog");
+        if (Route.isFullRoute(command.route()))
+        {
+            Status.Durability min = safeStore.durableBefore().min(txnId, 
command.route());
+            if (min.isDurableOrInvalidated())
+            {
+                if (tracing != null)
+                    tracing.trace(safeStore.commandStore(), "DurableBefore 
records %s; terminating home state", min);
+                setHomeDone(instance);
+                return;
+            }
+        }
 
         ProgressToken maxProgressToken = 
instance.savedProgressToken(txnId).merge(command);
         CallbackInvoker<ProgressToken, Outcome> invoker = 
invokeHomeCallback(instance, txnId, maxProgressToken, 
HomeState::recoverCallback);
@@ -225,7 +237,7 @@ abstract class HomeState extends WaitingState
     {
         set(null, instance, Done, NoneExpected);
         clearHomeRetryCounter();
-        instance.clearPending(Home, txnId);
+        instance.clearPendingAndActive(Home, txnId);
     }
 
     void setHomeDoneAndMaybeRemove(DefaultProgressLog instance)
diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java 
b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
index 044a9327..97c0dc02 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
@@ -18,6 +18,7 @@
 
 package accord.impl.progresslog;
 
+import java.time.temporal.TemporalUnit;
 import javax.annotation.Nullable;
 
 import com.google.common.primitives.Ints;
@@ -30,8 +31,9 @@ import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
-final class TxnState extends HomeState implements PreLoadContext
+public final class TxnState extends HomeState implements PreLoadContext
 {
     TxnState(TxnId txnId)
     {
@@ -46,9 +48,12 @@ final class TxnState extends HomeState implements 
PreLoadContext
             default:
                 throw new UnhandledEnum(newProgress);
             case NoneExpected:
-            case Querying:
                 newDelay = 0;
                 break;
+            case Querying:
+                newDelay = 
NANOSECONDS.toMicros(instance.config().maxActiveRunTime.toNanos());
+                Invariants.require(newDelay >= 0);
+                break;
             case Queued:
                 switch (updated)
                 {
@@ -69,10 +74,7 @@ final class TxnState extends HomeState implements 
PreLoadContext
         }
 
         TxnStateKind scheduled = scheduledTimer();
-        if (scheduled == null)
-        {
-            Invariants.require(pendingTimer() == null);
-        }
+        Invariants.require(scheduled != null || pendingTimer() == null);
 
         // previousDeadline is the previous deadline of <updated>;
         // otherDeadline is the active deadline (if any) of <updated.other()>
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index e7eb0922..da23e910 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -19,7 +19,6 @@
 package accord.impl.progresslog;
 
 import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -349,7 +348,7 @@ abstract class WaitingState extends BaseTxnState
     void setWaitingDone(DefaultProgressLog owner)
     {
         set(null, owner, CanApply, NoneExpected);
-        owner.clearPending(Waiting, txnId);
+        owner.clearPendingAndActive(Waiting, txnId);
         clearWaitingRetryCounter();
     }
 
@@ -360,7 +359,7 @@ abstract class WaitingState extends BaseTxnState
         {
             clearAwaitState();
             clearWaitingRetryCounter();
-            owner.clearPending(Waiting, txnId);
+            owner.clearPendingAndActive(Waiting, txnId);
             set(safeStore, owner, blockedUntil, Queued);
         }
     }
@@ -374,11 +373,11 @@ abstract class WaitingState extends BaseTxnState
             set(null, owner, isDone ? CanApply : currentlyBlockedUntil, 
NoneExpected);
             if (isDone)
                 maybeRemove(owner);
-            owner.clearPending(Waiting, txnId);
+            owner.clearPendingAndActive(Waiting, txnId);
         }
     }
 
-    final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand, 
DefaultProgressLog owner)
+    final void runWaiting(DefaultProgressLog owner, SafeCommandStore 
safeStore, SafeCommand safeCommand)
     {
         runInternal(safeStore, safeCommand, owner, 
owner.node.agent().trace(txnId, WAIT_PROGRESS));
     }
@@ -774,16 +773,16 @@ abstract class WaitingState extends BaseTxnState
     {
         // TODO (desired): fetch only the route
         // we MUSt allocate before calling withEpoch to register cancellation, 
as async
-        BiConsumer<Route<?>, Throwable> invoker = invokeWaitingCallback(owner, 
txnId, blockedUntil, WaitingState::fetchRouteCallback);
-        FetchRoute.fetchRoute(owner.node(), txnId, contactable, new 
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
+        CallbackInvoker<BlockedUntil, Route<?>> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, 
WaitingState::fetchRouteCallback);
+        owner.start(invoker, FetchRoute.fetchRoute(owner.node(), txnId, 
contactable, new IncludingSpecificStoreSelector(owner.commandStore.id()), 
invoker));
     }
 
     static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, 
TxnId txnId, Timestamp executeAt, Route<?> slicedRoute, Route<?> fetchRoute, 
Route<?> maxRoute)
     {
         Invariants.require(!slicedRoute.isEmpty());
         // we MUSt allocate before calling withEpoch to register cancellation, 
as async
-        BiConsumer<FetchData.FetchResult, Throwable> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback);
-        FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, 
owner.node(), txnId, executeAt, fetchRoute, maxRoute, new 
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
+        CallbackInvoker<BlockedUntil, FetchData.FetchResult> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback);
+        owner.start(invoker, 
FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId, 
executeAt, fetchRoute, maxRoute, new 
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker));
     }
 
     void awaitHomeKey(DefaultProgressLog owner, BlockedUntil blockedUntil, 
TxnId txnId, Timestamp executeAt, Route<?> route)
@@ -803,10 +802,10 @@ abstract class WaitingState extends BaseTxnState
     {
         long epoch = blockedUntil.fetchEpoch(txnId, executeAt);
         // we MUST allocate the invoker before invoking withEpoch as this may 
be asynchronous and we must first register our callback for cancellation
-        BiConsumer<AsynchronousAwait.SynchronousResult, Throwable> invoker = 
invokeWaitingCallback(owner, txnId, blockedUntil, callback);
-        owner.node().withEpochAtLeast(epoch, (Executor)null, invoker, () -> {
+        CallbackInvoker<BlockedUntil, AsynchronousAwait.SynchronousResult> 
invoker = invokeWaitingCallback(owner, txnId, blockedUntil, callback);
+        owner.start(invoker, owner.node().withEpochAtLeast(epoch, 
(Executor)null, invoker, () -> {
             AsynchronousAwait.awaitAny(owner.node(), contact(owner, route, 
epoch), txnId, route, blockedUntil, callbackId, invoker);
-        });
+        }));
     }
 
     String toStateString()
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index dc19879d..971d76aa 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -64,6 +64,7 @@ import accord.coordinate.CoordinationAdapter.Factory.Kind;
 import accord.coordinate.Infer.InvalidIf;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
+import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.CommandStores.StoreSelector;
 import accord.local.durability.DurabilityService;
 import accord.messages.Callback;
@@ -428,19 +429,20 @@ public class Node implements 
ConfigurationService.Listener, NodeCommandStoreServ
         }
     }
 
-    public void withEpochAtLeast(long epoch, @Nullable Executor ifAsync, 
BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess)
+    public Object withEpochAtLeast(long epoch, @Nullable Executor ifAsync, 
BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess)
     {
         if (topology.hasAtLeastEpoch(epoch))
         {
             ifSuccess.run();
+            return ifSuccess;
         }
         else
         {
-            topology.awaitEpoch(epoch, ifAsync).begin((success, fail) -> {
+            configService.fetchTopologyForEpoch(epoch);
+            return topology.awaitEpoch(epoch, ifAsync).begin((success, fail) 
-> {
                 if (fail != null) ifFailure.accept(null, fail);
                 else ifSuccess.run();
             });
-            configService.fetchTopologyForEpoch(epoch);
         }
     }
 
@@ -891,7 +893,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         }
     }
 
-    public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf 
invalidIf, FullRoute<?> route, StoreSelector reportTo, @Nullable Tracing 
tracing)
+    public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf 
invalidIf, FullRoute<?> route, LatentStoreSelector reportTo, @Nullable Tracing 
tracing)
     {
         {
             AsyncResult<? extends Outcome> result = coordinating.get(txnId);
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 4e062cf0..31368832 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -119,7 +119,7 @@ public class DurabilityQueue
         {
             ++pendingCounter;
             pending.add(new Pending(syncPoint, request, attempt));
-            if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter > 
prunedAt + pending.size())
+            if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter > 
prunedAt)
                 prune();
         }
     }
@@ -191,7 +191,8 @@ public class DurabilityQueue
 
     private synchronized void prune()
     {
-        prunedAt = pendingCounter;
+        prunedAt = pending.size();
+        pendingCounter = 0;
         List<SortForPruning> sorted = new ArrayList<>();
         for (Pending p : pending)
         {
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 3bc9ae49..e8bfcd1a 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -726,7 +726,9 @@ public class Cluster
             Runnable updateProgressLogConcurrency;
             {
                 updateProgressLogConcurrency = () -> {
-                    nodeMap.values().forEach(node -> 
node.commandStores().forEachCommandStore(cs -> 
((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(8, 
32))));
+                    nodeMap.values().forEach(node -> 
node.commandStores().forEachCommandStore(cs -> cs.execute(() -> {
+                        
((TestProgressLog)cs.unsafeProgressLog()).config().concurrency = 
random.nextInt(8, 32);
+                    })));
                 };
             }
             updateProgressLogConcurrency.run();
diff --git a/accord-core/src/test/java/accord/impl/basic/Pending.java 
b/accord-core/src/test/java/accord/impl/basic/Pending.java
index 7214d5bc..6474af87 100644
--- a/accord-core/src/test/java/accord/impl/basic/Pending.java
+++ b/accord-core/src/test/java/accord/impl/basic/Pending.java
@@ -37,6 +37,11 @@ public interface Pending
             Invariants.require(activeOrigin != null);
         }
 
+        public static void unsafeSetActiveOrigin(Pending newActiveOrigin)
+        {
+            activeOrigin = newActiveOrigin;
+        }
+
         public static void clearActiveOrigin()
         {
             activeOrigin = null;
diff --git a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java 
b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
index 56e76f00..f03bebc3 100644
--- a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
+++ b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
@@ -18,18 +18,23 @@
 
 package accord.impl.basic;
 
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 
 import accord.api.ProgressLog;
 import accord.impl.progresslog.DefaultProgressLog;
+import accord.impl.progresslog.TxnState;
+import accord.impl.progresslog.TxnStateKind;
 import accord.local.CommandStore;
 import accord.local.Node;
+import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 
 public class TestProgressLogs implements ProgressLog.Factory
 {
     static class TestProgressLog extends DefaultProgressLog
     {
+        private static final RecurringPendingRunnable RECURRING = new 
RecurringPendingRunnable(-1, null, null, null, null, true){};
         protected TestProgressLog(Node node, CommandStore commandStore)
         {
             super(node, commandStore);
@@ -46,12 +51,30 @@ public class TestProgressLogs implements ProgressLog.Factory
             }
             finally
             {
-                if (prev != Pending.Global.NONE)
+                Pending.Global.unsafeSetActiveOrigin(prev);
+            }
+        }
+
+        @Override
+        protected void run(TxnStateKind runKind, TxnState run, 
SafeCommandStore safeStore, SafeCommand safeCommand)
+        {
+            if (run.txnId.isSystemTxn())
+            {
+                Pending prev = Pending.Global.activeOrigin();
+                Pending.Global.unsafeSetActiveOrigin(RECURRING);
+                try
+                {
+                    super.run(runKind, run, safeStore, safeCommand);
+                }
+                finally
                 {
-                    Pending.Global.clearActiveOrigin();
-                    Pending.Global.setActiveOrigin(prev);
+                    Pending.Global.unsafeSetActiveOrigin(prev);
                 }
             }
+            else
+            {
+                super.run(runKind, run, safeStore, safeCommand);
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to