This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71d235d5 Improve:
71d235d5 is described below

commit 71d235d56cb315fa5ae01ec24d3d9f08dd08ac6a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Jul 1 14:24:01 2025 +0100

    Improve:
    
     - Journal debugging vtable support
     - Background task tracing support
    Fix:
     - HLC_BOUND only valid for strictly lower HLC
     - HAS_UNIQUE_HLC can only be safely computed if READY_TO_EXECUTE
     - Break recursion in CommandStore.ensureReadyToCoordinate
     - Fix find intersecting shard scheduler
     - Separate adhoc ShardScheduler from normal to avoid overwriting
     - Should still use execution listeners to detect invalid reads for certain 
transactions even with dataStoreDetectsFutureReads
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20746
---
 accord-core/src/main/java/accord/api/Agent.java    |  4 +
 accord-core/src/main/java/accord/api/Journal.java  |  4 +
 .../src/main/java/accord/api/TraceEventType.java   | 23 ++++++
 accord-core/src/main/java/accord/api/Tracing.java  | 35 +++++++++
 .../main/java/accord/coordinate/CheckShards.java   | 21 ++++-
 .../accord/coordinate/CoordinateEphemeralRead.java |  3 +-
 .../main/java/accord/coordinate/ExecuteFlag.java   |  5 +-
 .../java/accord/coordinate/ExecuteSyncPoint.java   |  1 -
 .../src/main/java/accord/coordinate/FetchData.java | 12 ++-
 .../src/main/java/accord/coordinate/Propose.java   |  1 -
 .../java/accord/coordinate/RecoverWithRoute.java   |  6 +-
 .../src/main/java/accord/impl/CommandChange.java   | 30 +++++++
 .../java/accord/impl/DefaultLocalListeners.java    |  3 -
 .../java/accord/impl/progresslog/WaitingState.java | 91 ++++++++++++++++++----
 .../src/main/java/accord/local/CommandStore.java   |  7 +-
 .../src/main/java/accord/local/DepsCalculator.java |  7 +-
 .../src/main/java/accord/local/DurableBefore.java  |  1 -
 .../main/java/accord/local/cfk/CommandsForKey.java |  8 +-
 .../accord/local/durability/DurabilityQueue.java   |  1 -
 .../accord/local/durability/ShardDurability.java   | 88 +++++++++++++++------
 .../java/accord/messages/GetEphemeralReadDeps.java |  1 -
 .../src/main/java/accord/messages/Propagate.java   | 73 +++++++++++++++--
 .../src/main/java/accord/messages/ReadData.java    | 20 +++--
 .../src/test/java/accord/impl/list/ListAgent.java  | 13 ++++
 24 files changed, 374 insertions(+), 84 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Agent.java 
b/accord-core/src/main/java/accord/api/Agent.java
index f9c3cf2c..89016626 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -20,6 +20,8 @@ package accord.api;
 
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
+
 import accord.api.ProgressLog.BlockedUntil;
 import accord.local.Command;
 import accord.local.Node;
@@ -43,6 +45,8 @@ import accord.utils.async.AsyncChain;
  */
 public interface Agent extends UncaughtExceptionListener
 {
+    default @Nullable Tracing trace(TxnId txnId, TraceEventType eventType) { 
return null; }
+
     /**
      * For use by implementations to decide what to do about successfully 
recovered transactions.
      * Specifically intended to define if and how they should inform clients 
of the result.
diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index f452e526..99806e2d 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -19,11 +19,14 @@
 package accord.api;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import accord.impl.CommandChange;
 import accord.local.Command;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
@@ -47,6 +50,7 @@ public interface Journal
     void start(Node node);
 
     Command loadCommand(int store, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
+    default List<? extends Supplier<CommandChange.Builder>> debugCommand(int 
store, TxnId txnId) { throw new UnsupportedOperationException(); }
     Command.Minimal loadMinimal(int store, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
 
     // TODO (required): propagate exceptions (i.e. using OnDone instead of 
Runnable)
diff --git a/accord-core/src/main/java/accord/api/TraceEventType.java 
b/accord-core/src/main/java/accord/api/TraceEventType.java
new file mode 100644
index 00000000..c47453db
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/TraceEventType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software ation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.api;
+
+public enum TraceEventType
+{
+    FETCH, PROGRESS
+}
diff --git a/accord-core/src/main/java/accord/api/Tracing.java 
b/accord-core/src/main/java/accord/api/Tracing.java
new file mode 100644
index 00000000..06d01120
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Tracing.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software ation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.api;
+
+import java.util.Arrays;
+
+import accord.local.CommandStore;
+
+public interface Tracing
+{
+    void trace(CommandStore store, String message);
+
+    default void trace(CommandStore store, String fmt, Object ... args)
+    {
+        String message;
+        try { message = String.format(fmt, args); }
+        catch (Throwable t) { message = "Could not format \"" + fmt + "\" with 
" + Arrays.toString(args) + " (" + t.getLocalizedMessage() + ")"; }
+        trace(store, message);
+    }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java 
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index cb503072..91d743c9 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -20,6 +20,7 @@ package accord.coordinate;
 
 import javax.annotation.Nullable;
 
+import accord.api.Tracing;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.SequentialAsyncExecutor;
@@ -50,6 +51,7 @@ public abstract class CheckShards<U extends Participants<?>> 
extends ReadCoordin
     final IncludeInfo includeInfo;
     final @Nullable Ballot bumpBallot;
     final Infer.InvalidIf previouslyKnownToBeInvalidIf;
+    final @Nullable Tracing tracing;
 
     protected CheckStatusOk merged;
     protected boolean truncated;
@@ -62,6 +64,11 @@ public abstract class CheckShards<U extends Participants<?>> 
extends ReadCoordin
     }
 
     protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId 
txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot 
bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
+    {
+        this(node, executor, txnId, query, srcEpoch, includeInfo, bumpBallot, 
previouslyKnownToBeInvalidIf, null);
+    }
+
+    protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId 
txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot 
bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf, @Nullable Tracing 
tracing)
     {
         super(node, executor, topologyFor(node, txnId, query, srcEpoch), 
txnId);
         this.sourceEpoch = srcEpoch;
@@ -69,6 +76,7 @@ public abstract class CheckShards<U extends Participants<?>> 
extends ReadCoordin
         this.includeInfo = includeInfo;
         this.bumpBallot = bumpBallot;
         this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf;
+        this.tracing = tracing;
     }
 
     private static Topologies topologyFor(Node node, TxnId txnId, 
Unseekables<?> contact, long epoch)
@@ -81,6 +89,8 @@ public abstract class CheckShards<U extends Participants<?>> 
extends ReadCoordin
     protected void contact(Id id)
     {
         Participants<?> unseekables = 
query.slice(topologies().computeRangesForNode(id));
+        if (tracing != null)
+            tracing.trace(null, "%s contacting %s for %s", 
getClass().getSimpleName(), id, unseekables);
         node.send(id, new CheckStatus(txnId, unseekables, sourceEpoch, 
includeInfo, bumpBallot), executor, this);
     }
 
@@ -89,10 +99,10 @@ public abstract class CheckShards<U extends 
Participants<?>> extends ReadCoordin
 
     protected Action checkSufficient(Id from, CheckStatusOk ok)
     {
-        if (isSufficient(from, ok))
-            return Action.Approve;
-
-        return Action.ApproveIfQuorum;
+        Action action = isSufficient(from, ok) ? Action.Approve : 
Action.ApproveIfQuorum;
+        if (tracing != null)
+            tracing.trace(null, "%s %s reply %s from %s", 
getClass().getSimpleName(), action, ok, from);
+        return action;
     }
 
     @Override
@@ -107,6 +117,9 @@ public abstract class CheckShards<U extends 
Participants<?>> extends ReadCoordin
         }
         else
         {
+            if (tracing != null)
+                tracing.trace(null, "%s received failure reply %s from %s", 
getClass().getSimpleName(), reply, from);
+
             switch ((CheckStatus.CheckStatusNack)reply)
             {
                 default: throw new AssertionError(String.format("Unexpected 
status: %s", reply));
diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
index beab3314..b38fcaf1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
@@ -23,6 +23,7 @@ import java.util.function.BiConsumer;
 
 import accord.api.Result;
 import accord.coordinate.ExecuteFlag.CoordinationFlags;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.Node;
 import accord.local.SequentialAsyncExecutor;
@@ -166,7 +167,7 @@ public class CoordinateEphemeralRead extends 
AbstractCoordinatePreAccept<Result,
         Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue, 
ok -> ok.deps);
         topologies = node.topology().reselect(topologies, 
QuorumEpochIntersections.preaccept.include, route, executeAtEpoch, 
executeAtEpoch, SHARE, Owned);
         CoordinationFlags flags = oks.foldlNonNull((d, k, v, out) -> {
-            ExecuteFlag.ExecuteFlags.collect(out, k, v.flags, d, v.deps);
+            ExecuteFlags.collect(out, k, v.flags, d, v.deps);
             return out;
         }, deps, empty(oks.domain()));
         new ExecuteEphemeralRead(node, executor, topologies, route, 
txnId.withEpoch(executeAtEpoch), txn, deps, flags, callback).start();
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
index 1f6b750a..e81ab6ba 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
@@ -58,8 +58,9 @@ public enum ExecuteFlag
 
         public static void collect(CoordinationFlags into, Node.Id id, 
ExecuteFlags add, Object expectIfReadyToExecute, Object actualReadyToExecute)
         {
-            if (add.contains(READY_TO_EXECUTE) && 
!expectIfReadyToExecute.equals(actualReadyToExecute))
-                add = add.without(READY_TO_EXECUTE);
+            // TODO (expected): this is overly restrictive, disabling the 
optimisation in multi shard cases; should only expect the parts the shard owns 
to be equal
+            if (add != none() && 
!expectIfReadyToExecute.equals(actualReadyToExecute))
+                add = none(); // HAS_UNIQUE_HLC only accurate if 
READY_TO_EXECUTE was also accurate
             into.add(id, add);
         }
     }
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java 
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 142573b5..ce0d64fa 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -49,7 +49,6 @@ import accord.utils.SortedListMap;
 import accord.utils.UnhandledEnum;
 import accord.utils.WrappableException;
 import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
 import accord.utils.async.AsyncResults.SettableResult;
 
 import static 
accord.coordinate.CoordinationAdapter.Adapters.exclusiveSyncPoint;
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java 
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 6af0111e..de2762bb 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -20,6 +20,7 @@ package accord.coordinate;
 import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
 
+import accord.api.Tracing;
 import accord.coordinate.Infer.InvalidIf;
 import accord.local.CommandStores.LatentStoreSelector;
 import accord.local.CommandStores.StoreSelector;
@@ -39,6 +40,7 @@ import accord.utils.Invariants;
 
 import javax.annotation.Nonnull;
 
+import static accord.api.TraceEventType.FETCH;
 import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
 
 /**
@@ -73,8 +75,9 @@ public class FetchData extends CheckShards<Route<?>>
         final Participants<?> contactable;
         final StoreSelector reportTo;
         final BiConsumer<? super FetchResult, Throwable> callback;
+        final @Nullable Tracing tracing;
 
-        public FetchRequest(SequentialAsyncExecutor executor, Known fetch, 
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, 
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback)
+        public FetchRequest(SequentialAsyncExecutor executor, Known fetch, 
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, 
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super 
FetchResult, Throwable> callback, @Nullable Tracing tracing)
         {
             this.executor = executor;
             this.fetch = fetch;
@@ -85,6 +88,7 @@ public class FetchData extends CheckShards<Route<?>>
             this.srcEpoch = fetch.fetchEpoch(txnId, executeAt);
             this.contactable = contactable;
             this.reportTo = reportTo;
+            this.tracing = tracing;
         }
     }
 
@@ -101,7 +105,7 @@ public class FetchData extends CheckShards<Route<?>>
      */
     public static void fetchSpecific(Known fetch, Node node, TxnId txnId, 
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> 
maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> 
callback)
     {
-        fetchSpecific(node, query, maxRoute, new 
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, 
maxRoute, reportTo, callback));
+        fetchSpecific(node, query, maxRoute, new 
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, 
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
     }
 
     public static void fetchSpecific(Node node, Route<?> query, Route<?> 
maxRoute, FetchRequest request)
@@ -188,6 +192,8 @@ public class FetchData extends CheckShards<Route<?>>
         Invariants.require((success == null) != (failure == null));
         if (failure != null)
         {
+            if (tracing != null)
+                tracing.trace(null, "%s completed with failure %s", 
getClass().getSimpleName(), failure);
             callback.accept(null, failure);
         }
         else
@@ -196,7 +202,7 @@ public class FetchData extends CheckShards<Route<?>>
                 Invariants.require(isSufficient(merged), "Status %s is not 
sufficient", merged);
 
             // TODO (expected): should we automatically trigger a new fetch if 
we find executeAt but did not request enough information? would be more robust
-            Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query(), maxRoute, reportTo, target, 
(CheckStatusOkFull) merged, callback);
+            Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query(), maxRoute, reportTo, target, 
(CheckStatusOkFull) merged, callback, tracing);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java 
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 26917132..c59d9839 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -44,7 +44,6 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
-import accord.topology.TopologyManager;
 import accord.utils.Invariants;
 import accord.utils.SortedArrays;
 import accord.utils.SortedListMap;
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java 
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index e4c2e63b..4ed56578 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -223,7 +223,7 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                         propagate = full;
                     }
 
-                    Propagate.propagate(node, txnId, 
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, 
reportTo, null, propagate, (s, f) -> callback.accept(f == null ? 
propagate.toProgressToken() : null, f));
+                    Propagate.propagate(node, txnId, 
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query, 
reportTo, null, propagate, (s, f) -> callback.accept(f == null ? 
propagate.toProgressToken() : null, f), null);
                     break;
                 }
 
@@ -271,12 +271,12 @@ public class RecoverWithRoute extends 
CheckShards<FullRoute<?>>
                 if (witnessedByInvalidation != null && 
witnessedByInvalidation.hasBeen(Status.PreCommitted))
                     throw illegalState("We previously invalidated, finding a 
status that should be recoverable");
 
-                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? INVALIDATED : null, f));
+                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? INVALIDATED : null, f), null);
                 break;
 
             case Erased:
                 // we should only be able to hit the Erased case if every 
participating shard has advanced past this TxnId, so we don't need to recover it
-                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
+                Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf, 
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) -> 
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f), null);
                 break;
         }
     }
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 216b0b46..a72ac99d 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -223,6 +223,36 @@ public class CommandChange
             return durability;
         }
 
+        public Timestamp executeAt()
+        {
+            return executeAt;
+        }
+
+        public Timestamp executesAtLeast()
+        {
+            return executesAtLeast;
+        }
+
+        public PartialTxn partialTxn()
+        {
+            return partialTxn;
+        }
+
+        public PartialDeps partialDeps()
+        {
+            return partialDeps;
+        }
+
+        public Writes writes()
+        {
+            return writes;
+        }
+
+        public Result result()
+        {
+            return result;
+        }
+
         public StoreParticipants participants()
         {
             return participants;
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 3b6a5694..97fca67a 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -32,7 +32,6 @@ import accord.local.Node;
 import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
-import accord.local.StoreParticipants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
 import accord.utils.AsymmetricComparator;
@@ -40,8 +39,6 @@ import accord.utils.Invariants;
 import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
 
-import static accord.local.StoreParticipants.Filter.UPDATE;
-
 // TODO (desired): evict to disk
 public class DefaultLocalListeners implements LocalListeners
 {
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java 
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index 5af483a5..dcb4335a 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -24,6 +24,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import accord.api.ProgressLog.BlockedUntil;
+import accord.api.Tracing;
 import accord.coordinate.AsynchronousAwait;
 import accord.coordinate.FetchData;
 import accord.coordinate.FetchRoute;
@@ -48,6 +49,7 @@ import accord.utils.UnhandledEnum;
 import static accord.api.ProgressLog.BlockedUntil.CanApply;
 import static accord.api.ProgressLog.BlockedUntil.Query.HOME;
 import static accord.api.ProgressLog.BlockedUntil.Query.SHARD;
+import static accord.api.TraceEventType.PROGRESS;
 import static accord.impl.progresslog.CallbackInvoker.invokeWaitingCallback;
 import static accord.impl.progresslog.PackedKeyTracker.bitSet;
 import static accord.impl.progresslog.PackedKeyTracker.clearRoundState;
@@ -378,17 +380,16 @@ abstract class WaitingState extends BaseTxnState
 
     final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand, 
DefaultProgressLog owner)
     {
-        runInternal(safeStore, safeCommand, owner);
+        runInternal(safeStore, safeCommand, owner, 
owner.node.agent().trace(txnId, PROGRESS));
     }
 
-    private void runInternal(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog owner)
+    private void runInternal(SafeCommandStore safeStore, SafeCommand 
safeCommand, DefaultProgressLog owner, @Nullable Tracing tracing)
     {
         BlockedUntil blockedUntil = blockedUntil();
         Command command = safeCommand.current();
         Invariants.require(!owner.hasActive(Waiting, txnId));
         
Invariants.require(command.saveStatus().compareTo(blockedUntil.unblockedFrom) < 
0,
                            "Command has met desired criteria (%s) but progress 
log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command);
-
         set(safeStore, owner, blockedUntil, Querying);
         TxnId txnId = safeCommand.txnId();
         // first make sure we have enough information to obtain the command 
locally
@@ -397,6 +398,8 @@ abstract class WaitingState extends BaseTxnState
 
         if (!Route.isRoute(maxContactable))
         {
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Blocked until %s. Fetching 
route from %s", blockedUntil, maxContactable);
             fetchRoute(owner, blockedUntil, txnId, maxContactable);
             return;
         }
@@ -405,6 +408,8 @@ abstract class WaitingState extends BaseTxnState
         if (homeSatisfies().compareTo(blockedUntil) < 0)
         {
             // first wait until the homeKey has progressed to a point where it 
can answer our query; we don't expect our shards to know until then anyway
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Blocked until %s. Waiting 
for home key %s to satisfy.", blockedUntil, route.homeKey());
             awaitHomeKey(owner, blockedUntil, txnId, executeAt, route);
             return;
         }
@@ -420,6 +425,8 @@ abstract class WaitingState extends BaseTxnState
         {
             // we know it has been decided one way or the other by the home 
shard at least, so we attempt a fetch
             // including the home shard to get us to at least PreCommitted 
where we can safely wait on individual shards
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Blocked until %s; not 
currently decided. Fetching from %s for epochs [%d..%d].", blockedUntil, 
slicedRoute, lowEpoch, highEpoch);
             fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
slicedRoute.withHomeKey(), route);
             return;
         }
@@ -432,6 +439,8 @@ abstract class WaitingState extends BaseTxnState
         if (awaitRoute.isHomeKeyOnlyRoute())
         {
             // at this point we can switch to polling as we know someone has 
the relevant state
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Blocked until %s. Fetching 
%s%s for epochs [%d..%d].", blockedUntil, slicedRoute, slicedRoute == 
fetchRoute ? "" : " from " + fetchRoute, lowEpoch, highEpoch);
             fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
fetchRoute, route);
             return;
         }
@@ -457,6 +466,9 @@ abstract class WaitingState extends BaseTxnState
         int roundStart = roundIndex * roundSize;
         if (roundStart >= awaitRoute.size())
         {
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Blocked until %s. Fetching 
%s%s for epochs [%d..%d].", blockedUntil, slicedRoute, slicedRoute == 
fetchRoute ? "" : " from " + fetchRoute, lowEpoch, highEpoch);
+
             // all of the shards we are awaiting have been processed and found 
at least one replica that has the state needed to answer our query
             // at this point we can switch to polling as we know someone has 
the relevant state
             fetch(owner, blockedUntil, txnId, executeAt, slicedRoute, 
fetchRoute, route);
@@ -467,6 +479,8 @@ abstract class WaitingState extends BaseTxnState
         awaitRoute = awaitRoute.slice(roundStart, roundEnd);
         // TODO (desired): use some mechanism (e.g. random chance or another 
counter)
         //   to either periodically fetch the whole remaining route or 
gradually increase the slice length
+        if (tracing != null)
+            tracing.trace(owner.commandStore, "Blocked until %s. Waiting for 
%s to satisfy; round %d of %d.", blockedUntil, awaitRoute, roundIndex, 
(awaitRoute.size() + (roundSize - 1))/roundSize);
         awaitSlice(owner, blockedUntil, txnId, executeAt, awaitRoute, 
(roundIndex << 1) | 1);
     }
 
@@ -481,14 +495,17 @@ abstract class WaitingState extends BaseTxnState
 
         Command command = safeCommand.current();
         Route<?> route = command.route();
+        Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
 
         if (fail == null)
         {
             if (route == null)
             {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Callback success, but 
route not found.");
                 Invariants.require(kind == CallbackKind.FetchRoute);
                 Invariants.require(ready == null);
-                state.retry(safeStore, safeCommand, owner, blockedUntil);
+                state.retry(safeStore, safeCommand, owner, blockedUntil, 
tracing);
                 return;
             }
 
@@ -518,14 +535,18 @@ abstract class WaitingState extends BaseTxnState
                 case AwaitHome:
                     if (ready.contains(route.homeKey()))
                     {
+                        if (tracing != null)
+                            tracing.trace(owner.commandStore, "Callback 
success. Home key ready.");
                         // the home shard was found to already have the 
necessary state, with no distributed await;
                         // we can immediately progress the state machine
                         Invariants.require(0 == 
state.awaitRoundIndex(roundSize));
                         Invariants.require(0 == state.awaitBitSet(roundSize));
-                        state.runInternal(safeStore, safeCommand, owner);
+                        state.runInternal(safeStore, safeCommand, owner, 
tracing);
                     }
                     else
                     {
+                        if (tracing != null)
+                            tracing.trace(owner.commandStore, "Callback 
success. Home key not ready; waiting async.");
                         // the home shard is not ready to answer our query, 
but we have registered our remote callback so can wait for it to contact us
                         state.set(safeStore, owner, blockedUntil, Awaiting);
                     }
@@ -545,7 +566,7 @@ abstract class WaitingState extends BaseTxnState
                             Invariants.expect((int) 
awaitRoute.findNextSameKindIntersection(roundStart, (Unseekables) ready, 0) / 
roundSize == roundIndex);
                             // TODO (desired): in this case perhaps upgrade to 
fetch for next round?
                             state.updateAwaitRound(roundIndex + 1, roundSize);
-                            state.runInternal(safeStore, safeCommand, owner);
+                            state.runInternal(safeStore, safeCommand, owner, 
tracing);
                         }
                         else
                         {
@@ -561,7 +582,7 @@ abstract class WaitingState extends BaseTxnState
                 case FetchRoute:
                     if (state.homeSatisfies().compareTo(blockedUntil) < 0)
                     {
-                        state.runInternal(safeStore, safeCommand, owner);
+                        state.runInternal(safeStore, safeCommand, owner, 
tracing);
                         return;
                     }
 
@@ -582,7 +603,7 @@ abstract class WaitingState extends BaseTxnState
                     {
                         // we don't think we have anything to wait for, but we 
have encountered some notReady responses; queue up a retry
                         state.setAwaitDone(roundSize);
-                        state.retry(safeStore, safeCommand, owner, 
blockedUntil);
+                        state.retry(safeStore, safeCommand, owner, 
blockedUntil, tracing);
                     }
                     else
                     {
@@ -590,7 +611,7 @@ abstract class WaitingState extends BaseTxnState
                         roundIndex = nextIndex / roundSize;
                         state.updateAwaitRound(roundIndex, roundSize);
                         state.initialiseAwaitBitSet(awaitRoute, notReady, 
roundIndex, roundSize);
-                        state.runInternal(safeStore, safeCommand, owner);
+                        state.runInternal(safeStore, safeCommand, owner, 
tracing);
                     }
                 }
             }
@@ -598,7 +619,7 @@ abstract class WaitingState extends BaseTxnState
         else
         {
             safeStore.agent().onCaughtException(fail, "Failed fetching data 
for " + state);
-            state.retry(safeStore, safeCommand, owner, blockedUntil);
+            state.retry(safeStore, safeCommand, owner, blockedUntil, tracing);
         }
     }
 
@@ -645,6 +666,7 @@ abstract class WaitingState extends BaseTxnState
         if ((callbackId & 1) != 1)
             return;
 
+        Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
         BlockedUntil blockedUntil = blockedUntil();
         if (callbackId == AWAITING_HOME_KEY_CALLBACKID)
         {
@@ -655,19 +677,42 @@ abstract class WaitingState extends BaseTxnState
                 setHomeSatisfies(newHomeStatus);
 
             if (waitingProgress() != Awaiting)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async home key 
callback %d but no longer Awaiting", callbackId);
                 return;
+            }
 
             if (newHomeStatus.compareTo(blockedUntil) < 0 || 
currentHomeStatus.compareTo(blockedUntil) >= 0)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received redundant 
async home key callback %d. Blocked until %s, home key now %s (previously %s)", 
callbackId, blockedUntil, newHomeStatus, currentHomeStatus);
                 return;
+            }
 
             SafeCommand safeCommand = safeStore.unsafeGet(txnId);
             if (safeCommand != null)
-                runInternal(safeStore, safeCommand, owner);
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async home key 
callback %d. Blocked until %s, home key now %s.", callbackId, blockedUntil, 
newHomeStatus);
+                runInternal(safeStore, safeCommand, owner, tracing);
+            }
         }
         else
         {
             if (waitingProgress() != Awaiting)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async callback 
%d but no longer Awaiting", callbackId);
                 return;
+            }
+
+            if (newStatus.compareTo(blockedUntil.unblockedFrom) < 0)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async callback 
%d with %s, insufficient for %s", callbackId, newStatus, blockedUntil);
+                return;
+            }
 
             callbackId >>= 1;
             SafeCommand safeCommand = 
Invariants.nonNull(safeStore.unsafeGet(txnId));
@@ -680,28 +725,46 @@ abstract class WaitingState extends BaseTxnState
             int roundIndex = awaitRoundIndex(roundSize);
             int updateBitSet = roundCallbackBitSet(owner, txnId, from, 
slicedRoute, callbackId, roundIndex, roundSize);
             if (updateBitSet == 0)
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Received async callback 
%d for already ready keys.", callbackId, blockedUntil);
                 return;
+            }
 
             int bitSet = awaitBitSet(roundSize);
             bitSet &= ~updateBitSet;
             setAwaitBitSet(bitSet, roundSize);
 
             if (bitSet == 0)
-                runInternal(safeStore, safeCommand, owner);
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Blocked until %s. 
Received async callback %d for waiting keys. Round complete.", blockedUntil, 
callbackId);
+
+                runInternal(safeStore, safeCommand, owner, tracing);
+            }
+            else
+            {
+                if (tracing != null)
+                    tracing.trace(owner.commandStore, "Blocked until %s. 
Received async callback %d for waiting keys. %d keys still waiting this 
round.", blockedUntil, callbackId, Integer.bitCount(bitSet));
+            }
         }
     }
 
     // TODO (expected): use back-off counter here
-    private void retry(SafeCommandStore safeStore, SafeCommand safeCommand, 
DefaultProgressLog owner, BlockedUntil blockedUntil)
+    private void retry(SafeCommandStore safeStore, SafeCommand safeCommand, 
DefaultProgressLog owner, BlockedUntil blockedUntil, @Nullable Tracing tracing)
     {
         if (!contactEveryone())
         {
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Retrying immediately, 
without contact restrictions");
             setContactEveryone(true);
             // try again immediately with a query to all eligible replicas
-            runInternal(safeStore, safeCommand, owner);
+            runInternal(safeStore, safeCommand, owner, tracing);
         }
         else
         {
+            if (tracing != null)
+                tracing.trace(owner.commandStore, "Retry queued for later.");
             // queue a retry
             set(safeStore, owner, blockedUntil, Queued);
         }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index fb4fb05a..2aca538c 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -582,12 +582,11 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
             .begin((success, fail) -> {
                 if (fail != null)
                 {
-                    Ranges remaining = 
redundantBefore.removeWitnessed(minForEpoch, ranges);
-                    remaining = redundantBefore.removeRetired(remaining);
+                    Ranges remaining = 
redundantBefore.removeRetired(redundantBefore.removeWitnessed(minForEpoch, 
ranges));
                     if (!remaining.isEmpty())
                     {
-                        logger.error("Failed to close epoch {} for ranges {} 
on store {}. Retrying.", epoch, remaining, id);
-                        ensureReadyToCoordinate(epoch, remaining);
+                        logger.error("Failed to close epoch {} for ranges {} 
on store {}. Retrying.", epoch, remaining, id, fail);
+                        node.someExecutor().execute(() -> 
ensureReadyToCoordinate(epoch, remaining));
                     }
                 }
             });
diff --git a/accord-core/src/main/java/accord/local/DepsCalculator.java 
b/accord-core/src/main/java/accord/local/DepsCalculator.java
index fc609cc4..c67b4459 100644
--- a/accord-core/src/main/java/accord/local/DepsCalculator.java
+++ b/accord-core/src/main/java/accord/local/DepsCalculator.java
@@ -65,9 +65,12 @@ public class DepsCalculator extends Deps.Builder implements 
CommandSummaries.Act
     {
         ExecuteFlags flags = ExecuteFlags.none();
         if (!hasUnappliedDependency)
+        {
             flags = flags.with(READY_TO_EXECUTE);
-        if (maxAppliedHlc < txnId.hlc())
-            flags = flags.with(HAS_UNIQUE_HLC);
+            // we don't know whether hlc is unique unless dependencies have 
applied
+            if (maxAppliedHlc < txnId.hlc())
+                flags = flags.with(HAS_UNIQUE_HLC);
+        }
         return flags;
     }
 
diff --git a/accord-core/src/main/java/accord/local/DurableBefore.java 
b/accord-core/src/main/java/accord/local/DurableBefore.java
index 7709d81a..f3830ccd 100644
--- a/accord-core/src/main/java/accord/local/DurableBefore.java
+++ b/accord-core/src/main/java/accord/local/DurableBefore.java
@@ -23,7 +23,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import accord.api.RoutingKey;
-import accord.local.durability.ShardDurability;
 import accord.primitives.AbstractRanges;
 import accord.primitives.Participants;
 import accord.primitives.Ranges;
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java 
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 5963902b..58036157 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -1789,7 +1789,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
                 break;
         }
         if (maxAppliedWriteByExecuteAt >= 0) maxUniqueHlc = 
Math.max(maxUniqueHlc, 
committedByExecuteAt[maxAppliedWriteByExecuteAt].executeAt.hlc());
-        else maxUniqueHlc = Math.max(maxUniqueHlc, bounds.gcBefore.hlc());
+        else maxUniqueHlc = Math.max(maxUniqueHlc, bounds.gcBefore.hlc() - 1); 
// only guaranteed to witness those with strictly lower hlc; might be some txn 
agreed with higher flag bits
 
         return updater.update(key, bounds, isNewBoundsInfo, byId, 
minUndecidedById, maxAppliedPreBootstrapWriteById, committedByExecuteAt, 
maxAppliedWriteByExecuteAt, maxUniqueHlc, loadingPruned, newPrunedBeforeById, 
unmanageds);
     }
@@ -2061,10 +2061,6 @@ public class CommandsForKey extends CommandsForKeyUpdate
         int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(), 
redundantBefore(newBounds));
         Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 || 
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 : 
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
 
-        long maxUniqueHlc = this.maxUniqueHlc;
-        if (maxUniqueHlc <= newBounds.gcBefore.hlc() && 
newBounds.gcBefore.is(HLC_BOUND))
-            maxUniqueHlc = 0;
-
         return notifyManagedPreBootstrap(this, newBounds, 
reconstructAndUpdateUnmanaged(key, newBounds, true, newById, maxUniqueHlc, 
newLoadingPruned, newPrunedBeforeById, unmanageds));
     }
 
@@ -2368,7 +2364,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
         if (maxWrite != null && maxWrite.hlc() >= maxUniqueHlc)
             return false;
         TxnId gcBefore = redundantBefore();
-        return gcBefore.hlc() < maxUniqueHlc || !gcBefore.is(HLC_BOUND);
+        return gcBefore.hlc() <= maxUniqueHlc || !gcBefore.is(HLC_BOUND);
     }
 
     public static boolean reportLinearizabilityViolations()
diff --git 
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java 
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 69948f1c..4e062cf0 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -51,7 +51,6 @@ import accord.primitives.SyncPoint;
 import accord.primitives.TxnId;
 import accord.topology.TopologyManager;
 import accord.utils.Invariants;
-import accord.utils.async.AsyncResult;
 import org.agrona.collections.ObjectHashSet;
 
 import static accord.coordinate.ExecuteSyncPoint.coordinateIncluding;
diff --git 
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java 
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index e9666e42..58d7641f 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -18,8 +18,10 @@
 
 package accord.local.durability;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -41,6 +43,7 @@ import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.Routable.Domain;
 import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.topology.Shard;
 import accord.topology.Topology;
@@ -95,6 +98,7 @@ public class ShardDurability
     // TODO (expected): support intra-shard parallelism
     private class ShardScheduler
     {
+        final boolean adhoc;
         final long id;
         Shard shard;
         boolean ticks, stopping, stopped, readyToStop;
@@ -123,8 +127,9 @@ public class ShardDurability
         DurabilityRequest activeRequest;
         Waiting waiting;
 
-        private ShardScheduler()
+        private ShardScheduler(boolean adhoc)
         {
+            this.adhoc = adhoc;
             this.id = nextShardId.incrementAndGet();
         }
 
@@ -164,11 +169,8 @@ public class ShardDurability
 
         void markDefunct()
         {
-            logger.info("Marking shard durability scheduler {} for {} 
defunct", id, shard);
-        }
-
-        synchronized void markDefunctSilent()
-        {
+            if (!adhoc)
+                logger.info("Marking shard durability scheduler {} for {} 
defunct", id, shard);
             stopping = true;
         }
 
@@ -220,7 +222,8 @@ public class ShardDurability
                         node.scheduler().once(() -> {
                             synchronized (ShardDurability.this)
                             {
-                                shardSchedulers.remove(shard.range, this);
+                                if (adhoc) 
adhocShardSchedulers.remove(shard.range, this);
+                                else shardSchedulers.remove(shard.range, this);
                             }
                         }, 0, MICROSECONDS);
                     }
@@ -429,9 +432,22 @@ public class ShardDurability
      * duration.
      */
     private long shardCycleTimeMicros = TimeUnit.SECONDS.toMicros(30);
-    private long shardCycleTimeoutDelayMicros = TimeUnit.HOURS.toMicros(1);
 
     private final NavigableMap<Range, ShardScheduler> shardSchedulers = new 
TreeMap<>(Range::compare);
+
+    /**
+     * Adhoc schedulers for ranges we don't own but have been asked to 
coordinate durability for.
+     *
+     * Nodes only maintain regular schedulers for the ranges they own, but 
sometimes a node may
+     * be asked to coordinate durability for ranges it doesn't own, such as 
during repair. Nothing
+     * requires these operations to be coordinated by an owning replica.
+     *
+     * When creating schedulers for non-owned ranges, we might accidentally 
pick a range that we DO own and
+     * overwrite the existing background scheduler for that range, causing us 
to stop running regular
+     * background actions once the adhoc operation completes. To avoid this 
possibility, we separate adhoc schedulers
+     * from regular schedulers to prevent accidental overwriting.
+     */
+    private final NavigableMap<Range, ShardScheduler> adhocShardSchedulers = 
new TreeMap<>(Range::compare);
     private final ConcurrencyControl syncPointControl = new 
ConcurrencyControl(8);
     private final DurabilityQueue durabilityQueue;
     private long latestEpoch;
@@ -466,11 +482,6 @@ public class ShardDurability
         scheduled = node.scheduler().recurring(this::tick, 
shardCycleTimeMicros / targetShardSplits, MICROSECONDS);
     }
 
-    public synchronized void setShardCycleTimeoutDelay(long 
newShardCycleTimeoutDelay, TimeUnit units)
-    {
-        shardCycleTimeoutDelayMicros = 
units.toMicros(newShardCycleTimeoutDelay);
-    }
-
     /**
      * Schedule regular invocations of CoordinateShardDurable and 
CoordinateGloballyDurable
      */
@@ -484,6 +495,8 @@ public class ShardDurability
     {
         for (ShardScheduler scheduler : shardSchedulers.values())
             scheduler.tick();
+        for (ShardScheduler scheduler : adhocShardSchedulers.values())
+            scheduler.tick();
     }
 
     public synchronized void stop()
@@ -491,7 +504,10 @@ public class ShardDurability
         stop = true;
         for (ShardScheduler scheduler : shardSchedulers.values())
             scheduler.markDefunct();
+        for (ShardScheduler scheduler : adhocShardSchedulers.values())
+            scheduler.markDefunct();
         shardSchedulers.clear();
+        adhocShardSchedulers.clear();
     }
 
     public synchronized void request(DurabilityRequest request)
@@ -509,6 +525,10 @@ public class ShardDurability
     {
         {
             Map.Entry<Range, ShardScheduler> e = 
shardSchedulers.floorEntry(range);
+            // we look up ceiling entry as well, because our comparison on 
both start and end
+            // means a match with the same start but higher end may be 
considered to sort after
+            if (e == null || e.getKey().compareIntersecting(range) != 0)
+                e = shardSchedulers.ceilingEntry(range);
             if (e != null)
             {
                 Range shardRange = e.getKey();
@@ -531,16 +551,18 @@ public class ShardDurability
         while (true)
         {
             // TODO (desired): seed the numberOfSplits based on our permanent 
shardScheduler average?
-            ShardScheduler shardScheduler = new ShardScheduler();
+            ShardScheduler scheduler = new ShardScheduler(true);
             Topology topology = node.topology().current();
             int i = topology.indexForKey(range.start());
             if (i >= 0 && !range.startInclusive() && 
range.start().equals(topology.get(i).range.end())) ++i;
             Invariants.require(i < 0 || 
topology.get(i).range.compareIntersecting(range) == 0);
             Shard shard = i >= 0 ? topology.get(i) : Shard.create(range, 
ofSorted(node.id()), ofSorted(node.id()));
-            shardScheduler.update(shard, 0, false);
-            shardScheduler.request(request, range.intersection(shard.range));
-            shardScheduler.markDefunctSilent();
-            shardSchedulers.put(shard.range, shardScheduler);
+            ShardScheduler existing = 
adhocShardSchedulers.putIfAbsent(shard.range, scheduler);
+            if (existing != null) scheduler = existing;
+            else logger.info("Starting adhoc shard durability scheduler {} for 
{}", scheduler.id, shard);
+            scheduler.update(shard, 0, false);
+            scheduler.request(request, range.intersection(shard.range));
+            scheduler.markDefunct();
             if (shard.range.contains(range))
                 break;
             range = range.newRange(shard.range.end(), range.end());
@@ -578,7 +600,7 @@ public class ShardDurability
             ShardScheduler scheduler = prev.remove(shard.range);
             if (scheduler == null)
             {
-                scheduler = new ShardScheduler();
+                scheduler = new ShardScheduler(false);
                 logger.info("Starting shard durability scheduler {} for {}", 
scheduler.id, shard);
             }
             shardSchedulers.put(shard.range, scheduler);
@@ -591,15 +613,19 @@ public class ShardDurability
 
     public synchronized ImmutableView immutableView()
     {
-        TreeMap<Range, ShardScheduler> schedulers = new 
TreeMap<>(Range::compare);
-        schedulers.putAll(shardSchedulers);
+        List<Map.Entry<Range, ShardScheduler>> schedulers = new 
ArrayList<>(shardSchedulers.entrySet());
+        if (!adhocShardSchedulers.isEmpty())
+        {
+            schedulers.addAll(adhocShardSchedulers.entrySet());
+            schedulers.sort(Map.Entry.comparingByKey(Range::compare));
+        }
         return new ImmutableView(schedulers);
     }
 
     public static class ImmutableView
     {
-        private final TreeMap<Range, ShardScheduler> schedulers;
-        ImmutableView(TreeMap<Range, ShardScheduler> schedulers)
+        private final List<Map.Entry<Range, ShardScheduler>> schedulers;
+        ImmutableView(List<Map.Entry<Range, ShardScheduler>> schedulers)
         {
             this.schedulers = schedulers;
         }
@@ -619,6 +645,8 @@ public class ShardDurability
 
         long lastStartedAtMicros;
         long cycleStartedAtMicros;
+        Timestamp min;
+        Object requestedBy;
         int retries;
 
         Ranges active;
@@ -627,7 +655,7 @@ public class ShardDurability
         public boolean advance()
         {
             if (iterator == null)
-                iterator = schedulers.entrySet().iterator();
+                iterator = schedulers.iterator();
 
             if (!iterator.hasNext())
             {
@@ -651,6 +679,8 @@ public class ShardDurability
                 this.currentSplits = scheduler.currentSplits;
                 this.lastStartedAtMicros = scheduler.lastStartedAtMicros;
                 this.cycleStartedAtMicros = scheduler.cycleStartedAtMicros;
+                this.min = scheduler.activeRequest != null ? 
scheduler.activeRequest.min : null;
+                this.requestedBy = scheduler.activeRequest != null ? 
scheduler.activeRequest.requestedBy : null;
                 this.retries = scheduler.retries;
                 this.active = scheduler.active;
                 this.waiting = scheduler.waiting;
@@ -720,6 +750,16 @@ public class ShardDurability
             return cycleStartedAtMicros;
         }
 
+        public Timestamp min()
+        {
+            return min;
+        }
+
+        public Object requestedBy()
+        {
+            return requestedBy;
+        }
+
         public int retries()
         {
             return retries;
diff --git 
a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java 
b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
index 88043420..a0829543 100644
--- a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
@@ -29,7 +29,6 @@ import accord.local.SafeCommandStore;
 import accord.local.StoreParticipants;
 import accord.primitives.Deps;
 import accord.primitives.FullRoute;
-import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index a4fccc12..2a40ffaf 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -20,6 +20,7 @@ package accord.messages;
 import accord.api.ProtocolModifiers;
 import accord.api.Result;
 import accord.api.RoutingKey;
+import accord.api.Tracing;
 import accord.coordinate.FetchData.FetchResult;
 import accord.coordinate.Infer.InvalidIf;
 import accord.local.Cleanup;
@@ -92,7 +93,6 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     final Ballot acceptedOrCommitted;
     final Status.Durability durability;
     @Nullable final RoutingKey homeKey;
-    // this is a WHOLE NODE measure, so if commit epoch has more ranges we do 
not count as committed if we can only commit in coordination epoch
     final KnownMap known;
     final WithQuorum withQuorum;
     @Nullable final PartialTxn partialTxn;
@@ -101,6 +101,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     @Nullable final Writes writes;
     @Nullable final Result result;
     final BiConsumer<? super FetchResult, Throwable> callback;
+    final @Nullable Tracing trace;
 
     private transient volatile FetchResult fetchResult;
     private static final AtomicReferenceFieldUpdater<Propagate, FetchResult> 
fetchResultUpdater = AtomicReferenceFieldUpdater.newUpdater(Propagate.class, 
FetchResult.class, "fetchResult");
@@ -120,7 +121,7 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
     @Nullable Timestamp committedExecuteAt,
     @Nullable Writes writes,
     @Nullable Result result,
-    BiConsumer<? super FetchResult, Throwable> callback)
+    BiConsumer<? super FetchResult, Throwable> callback, @Nullable Tracing 
trace)
     {
         this.node = node;
         this.txnId = txnId;
@@ -141,12 +142,15 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         this.writes = writes;
         this.result = result;
         this.callback = callback;
+        this.trace = trace;
     }
 
-    public static void propagate(Node node, TxnId txnId, InvalidIf 
previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?> 
queried, Participants<?> contactable, LatentStoreSelector reportTo, @Nullable 
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, 
Throwable> callback)
+    public static void propagate(Node node, TxnId txnId, InvalidIf 
previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?> 
queried, Participants<?> contactable, LatentStoreSelector reportTo, @Nullable 
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult, 
Throwable> callback, @Nullable Tracing tracing)
     {
         if (full.maxKnowledgeSaveStatus.status == NotDefined && full.invalidIf 
== NotKnownToBeInvalid)
         {
+            if (tracing != null)
+                tracing.trace(null, "Found nothing for %s", txnId);
             callback.accept(new FetchResult(Nothing, queried.slice(0, 0)), 
null);
             return;
         }
@@ -158,9 +162,12 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         full = full.finish(queried, contactable, queried.with((Unseekables) 
contactable), withQuorum, previouslyKnownToBeInvalidIf);
         Route<?> route = Invariants.nonNull(full.route);
 
+        if (tracing != null)
+            tracing.trace(null, "Found %s for %s", full.map, txnId);
+
         Timestamp committedExecuteAt = full.executeAtIfKnown();
         Propagate propagate =
-            new Propagate(node, txnId, route, target, full.invalidIf, 
full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised, 
full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum, 
full.partialTxn, full.stableDeps, committedExecuteAt, full.writes, full.result, 
callback);
+            new Propagate(node, txnId, route, target, full.invalidIf, 
full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised, 
full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum, 
full.partialTxn, full.stableDeps, committedExecuteAt, full.writes, full.result, 
callback, tracing);
 
         long untilEpoch = txnId.epoch();
         if (committedExecuteAt != null)
@@ -185,7 +192,11 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         // TODO (expected): can we come up with a better more universal 
pattern for avoiding updating a command we don't intersect with?
         //   ideally integrated with safeStore.get()
         if (participants.owns().isEmpty() && safeStore.ifInitialised(txnId) == 
null)
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Uninitialised and not 
owned; skipping");
             return null;
+        }
 
         SafeCommand safeCommand = safeStore.get(txnId, participants);
         Command command = safeCommand.current();
@@ -200,7 +211,10 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         switch (command.saveStatus().phase)
         {
             // Already know the outcome, waiting on durability so maybe update 
with new durability information which can also trigger cleanup
-            case Persist: return updateDurability(safeStore, safeCommand, 
participants);
+            case Persist:
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "Already persisted; 
skipping");
+                return updateDurability(safeStore, safeCommand, participants);
             case Cleanup:
             case Invalidate:
                 return null;
@@ -265,6 +279,8 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         SaveStatus propagate = 
found.atLeast(currentlyKnown).propagatesSaveStatus();
         if (propagate.known.isSatisfiedBy(currentlyKnown))
         {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Already know at least 
as much as peer responses.");
             updateFetchResult(found, participants.owns());
             return updateDurability(safeStore, safeCommand, participants);
         }
@@ -281,23 +297,31 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
                 throw illegalState("Invalid states to propagate: " + 
propagate);
 
             case Invalidated:
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "Invalidating");
                 Commands.commitInvalidate(safeStore, safeCommand, route);
                 break;
 
             case Applied:
             case PreApplied:
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "Applying");
                 Invariants.require(committedExecuteAt != null);
                 // we must use the remote executeAt, as it might have a 
uniqueHlc we aren't aware of at commit
                 confirm(Commands.apply(safeStore, safeCommand, participants, 
Ballot.ZERO, txnId, route, committedExecuteAt, stableDeps, partialTxn, writes, 
result));
                 break;
 
             case Stable:
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "Committing as 
stable");
                 confirm(Commands.commit(safeStore, safeCommand, participants, 
Stable, acceptedOrCommitted, txnId, route, partialTxn, executeAtIfKnown, 
stableDeps, null));
                 break;
 
             case Committed:
                 // TODO (expected): we can propagate Committed as Stable if we 
have any other Stable result AND a quorum of committedDeps
             case PreCommitted:
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "Pre-committing");
                 confirm(Commands.precommit(safeStore, safeCommand, 
participants, txnId, executeAtIfKnown, promised));
                 // TODO (desired): would it be clearer to yield a SaveStatus 
so we can have PreCommittedWithDefinition
                 if (!found.definition().isKnown())
@@ -306,11 +330,19 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             case PreAccepted:
                 // only preaccept if we coordinate the transaction
                 if (safeStore.ranges().coordinates(txnId).intersects(route) && 
Route.isFullRoute(route))
+                {
+                    if (trace != null)
+                        trace.trace(safeStore.commandStore(), "Pre-accepting");
                     Commands.preaccept(safeStore, safeCommand, participants, 
txnId, partialTxn, null, false);
+                }
 
             case NotDefined:
                 if (invalidIf == IfUncommitted)
+                {
+                    if (trace != null)
+                        trace.trace(safeStore.commandStore(), "Marking 
invalidIfUncommitted");
                     safeStore.progressLog().invalidIfUncommitted(txnId);
+                }
                 break;
         }
 
@@ -354,12 +386,20 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
 
         Participants<?> stillTouches = participants.stillTouches();
         if (stillTouches.isEmpty())
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "No longer participating 
(stillTouches is empty); using knownForAny: %s", known.knownForAny());
             return known.knownForAny();
+        }
 
         RedundantStatus status = safeStore.redundantBefore().status(txnId, 
null, stillTouches);
         // try to see if we can safely purge the full command
         if (tryPurge(safeStore, safeCommand, status))
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Redundant with status 
%s; purged", status);
             return null;
+        }
 
         // if the command has been truncated globally, then we should expect 
to apply it
         // if we cannot obtain enough information from a majority to do so 
then we have been left behind
@@ -379,11 +419,15 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
             {
                 Invariants.require(notStaleTouches.containsAll(stillTouches));
                 
Invariants.require(notStaleOwnsOrMayExecute.containsAll(stillOwnsOrMayExecute));
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "No longer touches 
any keys that were found truncated");
                 return required;
             }
 
             if (stillOwnsOrMayExecute.isEmpty() && (!found.is(Outcome.Apply) 
|| known.hasFullyTruncated(staleTouches)))
             {
+                if (trace != null)
+                    trace.trace(safeStore.commandStore(), "No longer owns or 
executes any keys that were found truncated; marking vestigial");
                 Commands.setTruncatedOrVestigial(safeStore, safeCommand, 
participants);
                 return null;
             }
@@ -391,23 +435,42 @@ public class Propagate implements PreLoadContext, 
MapReduceConsume<SafeCommandSt
         }
 
         Participants<?> stale = staleTouches.with((Participants) 
staleOwnsOrMayExecute);
+        // TODO (expected): could be that two replicas are stale but cannot 
catch up;
+        //  I think this condition is to ensure a full quorum has truncated 
(so we haven't raced with truncation)
+        //  should solve another way
         if (!known.hasFullyTruncated(stale))
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Has participants %s 
that could not be fetched. Some responses were not truncated, so we may have 
raced with completion. Aborting.", stale);
             return null;
+        }
 
         // TODO (expected): trigger a refresh of redundantBefore; should be 
available on a peer
         // wait until we know the shard is ahead and we are behind
         if (!safeStore.redundantBefore().isShardOnlyApplied(txnId, stale))
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Has participants %s 
that could not be fetched, but the shard(s) have not been marked universally 
durable so we will not mark ourselves stale. Aborting.", stale);
             return null;
+        }
 
         Participants<?> staleOnlyTouches = 
staleTouches.without(staleOwnsOrMayExecute);
         Invariants.expect(txnId.awaitsPreviouslyOwned() || 
staleOnlyTouches.isEmpty(), "%s is SHARD_ONLY_APPLIED, so we expect it to have 
been filtered from StoreParticipants", staleOnlyTouches);
         // TODO (expected): if the above last ditch doesn't work, see if only 
the stale ranges can't apply and do some shenanigans to apply partially and 
move on
         if (ProtocolModifiers.Toggles.markStaleIfCannotExecute(txnId))
         {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Has participants %s 
that could not be fetched and the shard(s) have been marked universally 
durable. We have marked ourselves stale, and will apply the remaining ranges.", 
stale);
+
             safeStore.commandStore().markShardStale(safeStore, 
executeAtIfKnown == null ? txnId : executeAtIfKnown, stale.toRanges(), true);
             if (!stale.containsAll(stillTouches) || 
!stale.containsAll(stillOwnsOrMayExecute))
                 return required;
         }
+        else
+        {
+            if (trace != null)
+                trace.trace(safeStore.commandStore(), "Has participants %s 
that could not be fetched and the shard(s) have been marked universally 
durable. This transaction type is configured not to induce staleness, so 
erasing.", stale);
+        }
 
         // TODO (expected): we might prefer to adopt Redundant status, and 
permit ourselves to later accept the result of the execution and/or definition
         Commands.setTruncatedOrVestigial(safeStore, safeCommand, participants);
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 50beaf96..0d4f14d8 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -137,9 +137,9 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
     public final Participants<?> scope;
     public final long executeAtEpoch;
     public final ExecuteFlags flags;
+    final boolean requiresListenersDuringExecution;
     protected @Nullable PartialTxn partialTxn;
     protected @Nullable Timestamp executeAt;
-    private boolean fastExec;
 
     private transient State state = State.PENDING; // TODO (low priority, 
semantics): respond with the Executed result we have stored?
 
@@ -171,6 +171,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         this.partialTxn = txn == null ? null : txn.intersecting(scope, true);
         this.executeAt = executeAt;
         this.executeAtEpoch = executeAtEpoch;
+        this.requiresListenersDuringExecution = 
requiresListenersDuringExecution(txnId, flags);
     }
 
     protected ReadData(TxnId txnId, Participants<?> scope, @Nullable 
PartialTxn partialTxn, @Nullable Timestamp executeAt, long executeAtEpoch)
@@ -186,6 +187,12 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         this.executeAt = executeAt;
         this.executeAtEpoch = executeAtEpoch;
         this.flags = flags;
+        this.requiresListenersDuringExecution = 
requiresListenersDuringExecution(txnId, flags);
+    }
+
+    private static boolean requiresListenersDuringExecution(TxnId txnId, 
ExecuteFlags flags)
+    {
+        return !txnId.is(EphemeralRead) && (!dataStoreDetectsFutureReads() || 
!flags.contains(HAS_UNIQUE_HLC));
     }
 
     protected abstract ExecuteOn executeOn();
@@ -259,10 +266,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         stamp = node.currentStamp();
 
         if (flags.contains(READY_TO_EXECUTE) && 
fastReadsMayBypassSafeStore(txnId) && partialTxn != null && executeAt != null 
&& (txnId.is(EphemeralRead) || flags.contains(HAS_UNIQUE_HLC)))
-        {
-            fastExec = true;
             return node.commandStores().mapReduceConsume(scope, minEpoch(), 
executeAtEpoch, this::applyFastRead, this, this);
-        }
 
         return node.mapReduceConsumeLocal(this, scope, minEpoch(), 
executeAtEpoch, this);
     }
@@ -315,7 +319,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
                     return Redundant;
 
                 case EXECUTE:
-                    if (!dataStoreDetectsFutureReads())
+                    if (requiresListenersDuringExecution)
                         listeners.put(storeId, safeStore.register(txnId, 
this));
                     waitingOn.add(storeId);
                     ++waitingOnCount;
@@ -400,7 +404,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
                     if (waitingOn.add(storeId))
                         ++waitingOnCount;
 
-                    if (dataStoreDetectsFutureReads())
+                    if (!requiresListenersDuringExecution)
                         listeners.remove(storeId);
 
                     execute = true;
@@ -411,7 +415,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
         {
             logger.trace("{}: executing read", command.txnId());
             read(safeStore, command);
-            return !dataStoreDetectsFutureReads();
+            return requiresListenersDuringExecution;
         }
         else
         {
@@ -709,7 +713,7 @@ public abstract class ReadData implements PreLoadContext, 
Request, MapReduceCons
 
     protected void reply(Ranges unavailable, Data data, long uniqueHlc)
     {
-        if (data != null && !data.validateReply(txnId, executeAt, fastExec)) 
reply(Redundant, null);
+        if (data != null && !data.validateReply(txnId, executeAt, 
!requiresListenersDuringExecution)) reply(Redundant, null);
         else reply(new ReadOk(unavailable, data, uniqueHlc), null);
     }
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java 
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 49ff44c8..a67cffa1 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -28,9 +28,13 @@ import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 import java.util.function.LongSupplier;
 
+import javax.annotation.Nullable;
+
 import accord.api.Agent;
 import accord.api.ProgressLog;
 import accord.api.Result;
+import accord.api.TraceEventType;
+import accord.api.Tracing;
 import accord.coordinate.CoordinationFailed;
 import accord.coordinate.ExecuteSyncPoint;
 import accord.impl.basic.NodeSink;
@@ -108,6 +112,15 @@ public class ListAgent implements Agent
         }
     }
 
+    @Nullable
+    @Override
+    public Tracing trace(TxnId txnId, TraceEventType eventType)
+    {
+        if (rnd.nextFloat() < 0.01f)
+            return (i1, i2) -> {};
+        return null;
+    }
+
     @Override
     public void onInconsistentTimestamp(Command command, Timestamp prev, 
Timestamp next)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to