This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 291cbe70 Follow-up to: Do not contact faulty replicas, and support 
reporting slow replies for preaccept/read. Do not wait for stale or left nodes 
for durability.
291cbe70 is described below

commit 291cbe70ad82b0d5f875101f541d9f841912802f
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Oct 9 12:50:32 2024 +0100

    Follow-up to: Do not contact faulty replicas, and support reporting slow 
replies for preaccept/read. Do not wait for stale or left nodes for durability.
---
 .../accord/coordinate/CoordinateTransaction.java   |   3 -
 .../src/main/java/accord/coordinate/Stabilise.java |   8 +-
 .../coordinate/tracking/AbstractTracker.java       |   4 +-
 .../accord/coordinate/tracking/ReadTracker.java    |  13 +-
 .../java/accord/impl/AbstractFetchCoordinator.java |   6 +-
 .../java/accord/impl/AbstractRequestTimeouts.java  | 128 +++++-----
 .../java/accord/impl/DefaultLocalListeners.java    |   3 +-
 .../main/java/accord/impl/RequestCallbacks.java    |  43 ++--
 .../src/main/java/accord/local/Commands.java       |   1 +
 accord-core/src/main/java/accord/local/Node.java   |   2 +-
 .../accord/messages/ApplyThenWaitUntilApplied.java |  10 -
 .../src/main/java/accord/messages/Await.java       |  88 +++----
 .../src/main/java/accord/messages/Commit.java      |  50 ++--
 .../src/main/java/accord/messages/ReadData.java    | 261 +++++++++++++--------
 .../java/accord/messages/WaitUntilApplied.java     |   2 +-
 .../main/java/accord/topology/TopologyManager.java |   5 +
 16 files changed, 348 insertions(+), 279 deletions(-)

diff --git 
a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java 
b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
index 5cc380c1..8f8cb2b1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateTransaction.java
@@ -47,12 +47,9 @@ import static 
accord.coordinate.Propose.Invalidate.proposeAndCommitInvalidate;
  */
 public class CoordinateTransaction extends CoordinatePreAccept<Result>
 {
-    final Txn txn;
-
     private CoordinateTransaction(Node node, TxnId txnId, Txn txn, 
FullRoute<?> route)
     {
         super(node, txnId, txn, route);
-        this.txn = txn;
     }
 
     public static AsyncResult<Result> coordinate(Node node, FullRoute<?> 
route, TxnId txnId, Txn txn)
diff --git a/accord-core/src/main/java/accord/coordinate/Stabilise.java 
b/accord-core/src/main/java/accord/coordinate/Stabilise.java
index b6145100..05993692 100644
--- a/accord-core/src/main/java/accord/coordinate/Stabilise.java
+++ b/accord-core/src/main/java/accord/coordinate/Stabilise.java
@@ -36,6 +36,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topologies;
+import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.SortedListMap;
 
 import static accord.coordinate.ExecutePath.SLOW;
@@ -77,7 +78,12 @@ public abstract class Stabilise<R> implements 
Callback<ReadReply>
 
     void start()
     {
-        Commit.commitMinimal(node, stableTracker.topologies(), ballot, txnId, 
txn, route, executeAt, stabiliseDeps, this);
+        SortedArrayList<Node.Id> contact = 
stableTracker.filterAndRecordFaulty();
+        if (allTopologies.size() > 1)
+            contact = 
contact.with(allTopologies.nodes().without(stableTracker.nodes()).without(allTopologies::isFaulty));
+
+        if (contact == null) callback.accept(null, new Exhausted(txnId, 
route.homeKey(), null));
+        else Commit.commitMinimal(contact, node, stableTracker.topologies(), 
allTopologies, ballot, txnId, txn, route, executeAt, stabiliseDeps, this);
     }
 
     @Override
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
index d156e286..846621b5 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractTracker.java
@@ -120,6 +120,8 @@ public abstract class AbstractTracker<ST extends 
ShardTracker>
     }
 
     public abstract RequestStatus recordFailure(Id from);
+    // record failure before starting up; can skip any in-flight validation, 
and perhaps be cheaper
+    public RequestStatus prerecordFailure(Id from) { return 
recordFailure(from); }
 
     protected int topologyOffset(int topologyIdx)
     {
@@ -230,7 +232,7 @@ public abstract class AbstractTracker<ST extends 
ShardTracker>
             Id node = nodes.get(i);
             if (sorter.isFaulty(node))
             {
-                if (Failed == reportTo.recordFailure(node))
+                if (Failed == reportTo.prerecordFailure(node))
                     return null;
 
                 if (buffer == null)
diff --git 
a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java 
b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
index 5e6c55ec..3dee3651 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java
@@ -150,6 +150,12 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker>
             return ensureProgressOrFail();
         }
 
+        public ShardOutcomes prerecordReadFailure(Object ignore)
+        {
+            ++contacted;
+            return ensureProgressOrFail();
+        }
+
         private ShardOutcomes ensureProgressOrFail()
         {
             if (!shouldRead())
@@ -236,7 +242,7 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker>
 
     private boolean receiveResponseIsSlow(Id node)
     {
-        if (!inflight.remove(node.id))
+        if (!inflight.isEmpty() && !inflight.remove(node.id))
             throw illegalState("Nothing in flight for " + node);
 
         return slow != null && slow.remove(node.id);
@@ -292,6 +298,11 @@ public class ReadTracker extends 
AbstractTracker<ReadTracker.ReadShardTracker>
         return recordResponse(from, ReadShardTracker::recordReadFailure);
     }
 
+    public RequestStatus prerecordFailure(Id from)
+    {
+        return recordResponse(this, from, 
ReadShardTracker::prerecordReadFailure, null);
+    }
+
     protected RequestStatus recordResponse(Id from, BiFunction<? super 
ReadShardTracker, Boolean, ? extends ShardOutcome<? super ReadTracker>> 
function)
     {
         boolean isSlow = receiveResponseIsSlow(from);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java 
b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
index 2380a7f0..7bc1c4fa 100644
--- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
+++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java
@@ -276,11 +276,9 @@ public abstract class AbstractFetchCoordinator extends 
FetchCoordinator
         }
 
         @Override
-        protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable 
Data data, @Nullable Throwable fail)
+        protected ReadOk constructReadOk(Ranges unavailable, Data data)
         {
-            // TODO (review): If the fetch response actually does some 
streaming, but we send back the error
-            // it is a lot of work and data that might move and be unaccounted 
for at the coordinator
-            node.reply(replyTo, replyContext, fail == null ? new 
FetchResponse(unavailable, data, maxApplied()) : null, fail);
+            return new FetchResponse(unavailable, data, maxApplied());
         }
 
         protected Timestamp maxApplied()
diff --git a/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java 
b/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
index e909fe2c..dd3c55e9 100644
--- a/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
+++ b/accord-core/src/main/java/accord/impl/AbstractRequestTimeouts.java
@@ -34,8 +34,8 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
 {
     protected interface Expiring
     {
-        Expiring prepareToExpire(Stripe stripe, long now);
-        void onExpire(long now);
+        Expiring prepareToExpire();
+        void onExpire();
     }
 
     protected static class Stripe implements Runnable, 
Function<Stripe.AbstractRegistered, Runnable>
@@ -69,13 +69,13 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
 
 
             @Override
-            public void onExpire(long now)
+            public void onExpire()
             {
                 timeout.timeout();
             }
 
             @Override
-            public Expiring prepareToExpire(Stripe stripe, long now)
+            public Expiring prepareToExpire()
             {
                 return this;
             }
@@ -93,57 +93,9 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
         @Override
         public void run()
         {
-            long now = 0;
-            try (BufferList<Expiring> collect = new BufferList<>())
-            {
-                int i = 0;
-                try
-                {
-                    lock.lock();
-                    try
-                    {
-                        now = time.elapsed(MICROSECONDS);
-                        timeouts.advance(now, collect, BufferList::add);
-
-                        // prepare expiration while we hold the lock - this is 
to permit expiring objects to
-                        // reschedule themselves while returning some 
immediate expiry work to do
-                        for (int j = 0 ; j < collect.size() ; ++j)
-                        {
-                            Expiring in = collect.get(j);
-                            Expiring out = in.prepareToExpire(this, now);
-                            if (in != out)
-                                collect.set(j, out);
-                        }
-                    }
-                    finally
-                    {
-                        lock.unlock();
-                    }
-
-                    while (i < collect.size())
-                        collect.get(i++).onExpire(now);
-                }
-                catch (Throwable t)
-                {
-                    while (i < collect.size())
-                    {
-                        try
-                        {
-                            collect.get(i++).onExpire(now);
-                        }
-                        catch (Throwable t2)
-                        {
-                            t.addSuppressed(t2);
-                        }
-                    }
-                    throw t;
-                }
-            }
-        }
-
-        protected void register(AbstractRegistered register, long deadline)
-        {
-            timeouts.add(deadline, register);
+            long now = time.elapsed(MICROSECONDS);
+            lock();
+            unlock(now);
         }
 
         @Override
@@ -160,7 +112,7 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
             try
             {
                 Registered registered = new Registered(timeout);
-                register(registered, deadline);
+                timeouts.add(deadline, registered);
                 return registered;
             }
             finally
@@ -171,6 +123,7 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
 
         protected void lock()
         {
+            //noinspection LockAcquiredButNotSafelyReleased
             lock.lock();
         }
 
@@ -179,29 +132,60 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
             return lock.tryLock();
         }
 
-        protected void unlock()
+        protected void unlock(long now)
         {
+            int i = 0;
+            BufferList<Expiring> expire = null;
             try
             {
-                if (timeouts.shouldWake(time.elapsed(MICROSECONDS)))
-                    run();
-            }
-            finally
-            {
-                lock.unlock();
-            }
-        }
+                try
+                {
+                    if (!timeouts.shouldWake(now))
+                        return;
 
-        protected void unlock(long now)
-        {
-            try
+                    expire = new BufferList<>();
+                    timeouts.advance(now, expire, BufferList::add);
+
+                    // prepare expiration while we hold the lock - this is to 
permit expiring objects to
+                    // reschedule themselves while returning some immediate 
expiry work to do
+                    for (int j = 0; j < expire.size(); ++j)
+                    {
+                        Expiring in = expire.get(j);
+                        Expiring out = in.prepareToExpire();
+                        if (in != out)
+                            expire.set(j, out);
+                    }
+                }
+                finally
+                {
+                    lock.unlock();
+                }
+
+                // we want to process these without the lock
+                while (i < expire.size())
+                    expire.get(i++).onExpire();
+            }
+            catch (Throwable t)
             {
-                if (timeouts.shouldWake(now))
-                    run();
+                if (expire != null)
+                {
+                    while (i < expire.size())
+                    {
+                        try
+                        {
+                            expire.get(i++).onExpire();
+                        }
+                        catch (Throwable t2)
+                        {
+                            t.addSuppressed(t2);
+                        }
+                    }
+                }
             }
             finally
             {
-                lock.unlock();
+                if (expire != null)
+                    expire.close();
             }
         }
     }
@@ -228,7 +212,7 @@ public class AbstractRequestTimeouts<S extends 
AbstractRequestTimeouts.Stripe> i
     public RegisteredTimeout register(Timeout timeout, long delay, TimeUnit 
units)
     {
         long now = time.elapsed(MICROSECONDS);
-        long deadline = now + Math.max(1, MICROSECONDS.convert(delay, units));
+        long deadline = now + Math.max(1, units.toMicros(delay));
         int i = timeout.stripe() & (stripes.length - 1);
         while (true)
         {
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java 
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 071987c6..bfcdb8f5 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -39,7 +39,7 @@ import accord.utils.Invariants;
 import accord.utils.btree.BTree;
 import accord.utils.btree.BTreeRemoval;
 
-// TODO (required): evict to disk
+// TODO (expected): evict to disk
 public class DefaultLocalListeners implements LocalListeners
 {
     public static class Factory implements LocalListeners.Factory
@@ -462,6 +462,7 @@ public class DefaultLocalListeners implements LocalListeners
 
     private void notifyComplexListeners(SafeCommandStore safeStore, 
SafeCommand safeCommand)
     {
+        // TODO (expected): potential for lock inversion on notify calls; 
consider buffering notifies as we do elsewhere
         complexListeners.compute(safeCommand.txnId(), (id, cur) -> {
             if (cur == null)
                 return null;
diff --git a/accord-core/src/main/java/accord/impl/RequestCallbacks.java 
b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
index f6ea93cc..f3644061 100644
--- a/accord-core/src/main/java/accord/impl/RequestCallbacks.java
+++ b/accord-core/src/main/java/accord/impl/RequestCallbacks.java
@@ -21,6 +21,9 @@ package accord.impl;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.local.TimeService;
@@ -32,6 +35,8 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.CallbackStripe>
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(RequestCallbacks.class);
+
     public interface CallbackEntry
     {
         long registeredAt(TimeUnit units);
@@ -73,26 +78,26 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             }
 
             @Override
-            public Expiring prepareToExpire(Stripe stripe, long now)
+            public Expiring prepareToExpire()
             {
-                
Invariants.checkState(((CallbackStripe)stripe).callbacks.containsKey(callbackId));
-                if (now >= reportFailAt)
+                if (deadline() == reportFailAt)
                 {
-                    ((CallbackStripe)stripe).callbacks.remove(callbackId);
+                    callbacks.remove(callbackId);
                     return this;
                 }
 
-                stripe.register(this, reportFailAt);
+                Invariants.checkState(callbacks.containsKey(callbackId));
+                timeouts.add(reportFailAt, this);
                 return new Expiring()
                 {
                     @Override
-                    public Expiring prepareToExpire(Stripe stripe, long now)
+                    public Expiring prepareToExpire()
                     {
                         return this;
                     }
 
                     @Override
-                    public void onExpire(long now)
+                    public void onExpire()
                     {
                         safeInvoke(RegisteredCallback::unsafeOnSlow, null);
                     }
@@ -100,7 +105,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             }
 
             @Override
-            public void onExpire(long now)
+            public void onExpire()
             {
                 safeInvoke(RegisteredCallback::unsafeOnFailure, new 
accord.coordinate.Timeout(null, null));
             }
@@ -122,6 +127,8 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
 
             <P> void safeInvoke(BiConsumer<RegisteredCallback<T>, P> invoker, 
P param)
             {
+                // TODO (expected): have executor provide inStore() function 
so can invoke immediately
+                //   BUT need to be careful no callers fail if we invok to 
refactor a little as we cannot safely invoke callbacks before we have marked 
them in-flight
                 executor.execute(() -> {
                     try
                     {
@@ -162,8 +169,8 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             {
                 RegisteredCallback<T> registered = new 
RegisteredCallback<>(executor, callbackId, callback, to, now, reportSlowAt, 
reportFailAt);
                 Object existing = callbacks.putIfAbsent(callbackId, 
registered);
-                timeouts.add(Math.min(reportSlowAt, reportFailAt), registered);
                 Invariants.checkState(existing == null);
+                timeouts.add(Math.min(reportSlowAt, reportFailAt), registered);
                 return registered;
             }
             finally
@@ -184,6 +191,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
 
         private <T, P> RegisteredCallback<T> safeInvoke(long callbackId, 
Node.Id from, P param, BiConsumer<RegisteredCallback<T>, P> invoker, boolean 
remove)
         {
+            long now = time.elapsed(MICROSECONDS);
             lock();
             try
             {
@@ -198,7 +206,7 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
             }
             finally
             {
-                unlock();
+                unlock(now);
             }
         }
     }
@@ -213,14 +221,12 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
         super(time, stripeCount, CallbackStripe[]::new, CallbackStripe::new);
     }
 
-    public <T> void register(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long failDelay, TimeUnit units)
+    public <T> void registerWithDelay(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long failDelay, TimeUnit units)
     {
-        long now = time.elapsed(MICROSECONDS);
-        long failDeadline = now + units.toMicros(failDelay);
-        stripes[(int)callbackId & (stripes.length - 1)].register(callbackId, 
executor, callback, to, now, failDeadline);
+        registerWithDelay(callbackId, executor, callback, to, Long.MAX_VALUE, 
failDelay, units);
     }
 
-    public <T> void register(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long slowDelay, long failDelay, TimeUnit 
units)
+    public <T> void registerWithDelay(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long slowDelay, long failDelay, TimeUnit 
units)
     {
         long now = time.elapsed(MICROSECONDS);
         long reportFailAt = now + units.toMicros(failDelay);
@@ -228,7 +234,12 @@ public class RequestCallbacks extends 
AbstractRequestTimeouts<RequestCallbacks.C
         stripes[(int)callbackId & (stripes.length - 1)].register(callbackId, 
executor, callback, to, now, reportSlowAt, reportFailAt);
     }
 
-    public <T> void register(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long now, long reportSlowAt, long 
reportFailAt, TimeUnit units)
+    public <T> void registerAt(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long now, long reportFailAt, TimeUnit units)
+    {
+        registerAt(callbackId, executor, callback, to, now, Long.MAX_VALUE, 
reportFailAt, units);
+    }
+
+    public <T> void registerAt(long callbackId, AgentExecutor executor, 
Callback<T> callback, Node.Id to, long now, long reportSlowAt, long 
reportFailAt, TimeUnit units)
     {
         if (units != MICROSECONDS)
         {
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 3b57f409..ab0b4235 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -508,6 +508,7 @@ public class Commands
         CommandStore unsafeStore = safeStore.commandStore();
         // TODO (required, API): do we care about tracking the write 
persistence latency, when this is just a memtable write?
         //  the only reason it will be slow is because Memtable flushes are 
backed-up (which will be reported elsewhere)
+        // TODO (required): this is anyway non-monotonic and milliseconds 
granularity
         long t0 = safeStore.node().now();
         return command.writes().apply(safeStore, applyRanges(safeStore, 
command.executeAt()), command.partialTxn())
                .flatMap(unused -> unsafeStore.submit(context, ss -> {
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index ee48e239..23b4728f 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -375,7 +375,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
 
     public void withEpoch(long epoch, BiConsumer<Void, Throwable> callback)
     {
-        if (topology.hasEpoch(epoch))
+        if (topology.hasAtLeastEpoch(epoch))
         {
             callback.accept(null, null);
         }
diff --git 
a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
index 1340c882..e71660c7 100644
--- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java
@@ -18,12 +18,9 @@
 
 package accord.messages;
 
-import javax.annotation.Nullable;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.api.Data;
 import accord.api.Result;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
@@ -34,7 +31,6 @@ import accord.primitives.FullRoute;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Participants;
-import accord.primitives.Ranges;
 import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
@@ -122,12 +118,6 @@ public class ApplyThenWaitUntilApplied extends 
WaitUntilApplied
         return super.apply(safeStore);
     }
 
-    @Override
-    protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data 
data, @Nullable Throwable fail)
-    {
-        super.onAllSuccess(unavailable, data, fail);
-    }
-
     @Override
     public MessageType type()
     {
diff --git a/accord-core/src/main/java/accord/messages/Await.java 
b/accord-core/src/main/java/accord/messages/Await.java
index fbece1e6..dfb7ee7e 100644
--- a/accord-core/src/main/java/accord/messages/Await.java
+++ b/accord-core/src/main/java/accord/messages/Await.java
@@ -30,6 +30,7 @@ import accord.api.LocalListeners;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.api.RemoteListeners;
 import accord.api.RequestTimeouts;
+import accord.api.RequestTimeouts.RegisteredTimeout;
 import accord.local.Command;
 import accord.local.Commands;
 import accord.local.Node;
@@ -86,7 +87,7 @@ public class Await implements Request, 
MapReduceConsume<SafeCommandStore, Void>,
 
     private transient volatile Collection<LocalListeners.Registered> 
syncRegistrations;
     private static final AtomicReferenceFieldUpdater<Await, 
Collection<LocalListeners.Registered>> syncRegistrationsUpdater = 
AtomicReferenceFieldUpdater.newUpdater(Await.class, (Class)Collection.class, 
"syncRegistrations");
-    private transient RequestTimeouts.RegisteredTimeout timeout;
+    private transient RegisteredTimeout timeout;
 
     // we use exactly -1 to make serialization easy (can increment by 1 and 
store a non-negative integer)
     private static final int SYNCHRONOUS_CALLBACKID = -1;
@@ -184,22 +185,28 @@ public class Await implements Request, 
MapReduceConsume<SafeCommandStore, Void>,
         }
         else if (callbackId >= 0)
         {
+            RemoteListeners.Registration asyncRegistration = 
this.asyncRegistration;
             AwaitOk reply = asyncRegistration == null || 0 == 
asyncRegistration.done() ? AwaitOk.Ready : AwaitOk.NotReady;
             node.reply(replyTo, replyContext, reply, null);
         }
         else
         {
-            if (synchronouslyWaitingOn > 0)
+            int waitingOn = 
synchronouslyWaitingOnUpdater.decrementAndGet(this);
+            if (waitingOn >= 0)
             {
                 long replyTimeout = node.agent().replyTimeout(replyContext, 
MILLISECONDS);
                 timeout = node.requestTimeouts().register(this, replyTimeout, 
MILLISECONDS);
+                if (-1 == synchronouslyWaitingOn)
+                    timeout.cancel(); // we could leave a dangling timeout in 
this rare race condition
             }
-            if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
+            else
+            {
                 node.reply(replyTo, replyContext, AwaitOk.Ready, null);
+            }
         }
     }
 
-    synchronized public void timeout()
+    public void timeout()
     {
         timeout = null;
         cancel();
@@ -210,20 +217,17 @@ public class Await implements Request, 
MapReduceConsume<SafeCommandStore, Void>,
         return txnId.hashCode();
     }
 
-    synchronized void cancel()
+    void cancel()
     {
-        if (timeout != null)
-        {
-            timeout.cancel();
-            timeout = null;
-        }
-        if (syncRegistrations != null)
-        {
-            syncRegistrations.forEach(LocalListeners.Registered::cancel);
-            syncRegistrations = null;
-        }
-        if (asyncRegistration != null)
-            asyncRegistration = null;
+        RegisteredTimeout cancelTimeout = timeout;
+        Collection<LocalListeners.Registered> cancelRegistrations = 
syncRegistrations;
+        if (cancelTimeout != null)
+            cancelTimeout.cancel();
+        if (cancelRegistrations != null)
+            cancelRegistrations.forEach(LocalListeners.Registered::cancel);
+        timeout = null;
+        syncRegistrations = null;
+        asyncRegistration = null;
     }
 
     @Override
@@ -249,6 +253,30 @@ public class Await implements Request, 
MapReduceConsume<SafeCommandStore, Void>,
         }
     }
 
+    @Override
+    public long waitForEpoch()
+    {
+        return txnId.epoch();
+    }
+
+    @Override
+    public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
+    {
+        Command command = safeCommand.current();
+        if (command.saveStatus().compareTo(blockedUntil.minSaveStatus) >= 0)
+            return true;
+
+        if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
+        {
+            node.reply(replyTo, replyContext, AwaitOk.Ready, null);
+            if (timeout != null)
+                timeout.cancel();
+            syncRegistrations = null;
+        }
+
+        return false;
+    }
+
     public static class AsyncAwaitComplete implements Request, PreLoadContext, 
Consumer<SafeCommandStore>
     {
         public final TxnId txnId;
@@ -292,32 +320,8 @@ public class Await implements Request, 
MapReduceConsume<SafeCommandStore, Void>,
             if (safeCommand == null || safeCommand.current().saveStatus() == 
SaveStatus.Uninitialised)
                 return;
 
-            Command command = Commands.updateRoute(safeStore, safeCommand, 
route);
+            Commands.updateRoute(safeStore, safeCommand, route);
             safeStore.progressLog().remoteCallback(safeStore, safeCommand, 
newStatus, callbackId, from);
         }
     }
-
-    @Override
-    public long waitForEpoch()
-    {
-        return txnId.epoch();
-    }
-
-    @Override
-    public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
-    {
-        Command command = safeCommand.current();
-        if (command.saveStatus().compareTo(blockedUntil.minSaveStatus) >= 0)
-            return true;
-
-        if (-1 == synchronouslyWaitingOnUpdater.decrementAndGet(this))
-        {
-            node.reply(replyTo, replyContext, AwaitOk.Ready, null);
-            if (timeout != null)
-                timeout.cancel();
-            syncRegistrations = null;
-        }
-
-        return false;
-    }
 }
diff --git a/accord-core/src/main/java/accord/messages/Commit.java 
b/accord-core/src/main/java/accord/messages/Commit.java
index 96930eba..f2b29a65 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -46,6 +46,7 @@ import accord.primitives.Unseekables;
 import accord.topology.Topologies;
 import accord.topology.Topology;
 import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.TriFunction;
 import org.agrona.collections.IntHashSet;
 
@@ -156,16 +157,13 @@ public class Commit extends 
TxnRequest.WithUnsynced<CommitOrReadNack>
         this.readData = readData;
     }
 
-    public static void commitMinimal(Node node, Topologies 
coordinateEpochOnly, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, 
Timestamp executeAt, Deps unstableDeps, Callback<ReadReply> callback)
+    public static void commitMinimal(SortedArrayList<Id> contact, Node node, 
Topologies stabilise, Topologies all, Ballot ballot, TxnId txnId, Txn txn, 
FullRoute<?> route, Timestamp executeAt, Deps unstableDeps, Callback<ReadReply> 
callback)
     {
-        Invariants.checkArgument(coordinateEpochOnly.size() == 1, "Invalid 
coordinate epochs: %s", coordinateEpochOnly);
+        Invariants.checkArgument(stabilise.size() == 1, "Invalid coordinate 
epochs: %s", stabilise);
         // we want to send to everyone, and we want to include all of the 
relevant data, but we stabilise on the coordination epoch replica responses
-        Topology coordinates = coordinateEpochOnly.forEpoch(txnId.epoch());
-        Topologies all = coordinateEpochOnly;
-        if (txnId.epoch() != executeAt.epoch())
-            all = node.topology().preciseEpochs(route, txnId.epoch(), 
executeAt.epoch());
+        Topology coordinates = stabilise.forEpoch(txnId.epoch());
 
-        send(null, (i1, i2) -> true, null, node, coordinates, coordinates, 
all, Kind.CommitSlowPath, ballot,
+        send(contact, null, (i1, i2) -> true, null, node, coordinates, 
coordinates, all, Kind.CommitSlowPath, ballot,
              txnId, txn, route, executeAt, unstableDeps, callback);
     }
 
@@ -179,35 +177,33 @@ public class Commit extends 
TxnRequest.WithUnsynced<CommitOrReadNack>
         Topology executes = executeEpochOnly.forEpoch(executeAt.epoch());
         Topology coordinates = all.forEpoch(txnId.epoch());
 
-        send(readSet, (set, id) -> set.contains(id.id), readScope, node, 
coordinates, executes, all, kind, Ballot.ZERO,
+        SortedArrayList<Id> contact = all.nodes().without(all::isFaulty);
+        send(contact, readSet, (set, id) -> set.contains(id.id), readScope, 
node, coordinates, executes, all, kind, Ballot.ZERO,
              txnId, txn, route, executeAt, stableDeps, callback);
     }
 
-    private static <P> void send(P param, BiPredicate<P, Id> 
shouldRegisterCallback, @Nullable Participants<?> readScopeIfCallback,
+    private static <P> void send(SortedArrayList<Id> contact, P param, 
BiPredicate<P, Id> shouldRegisterCallback, @Nullable Participants<?> 
readScopeIfCallback,
                                  Node node, Topology coordinates, Topology 
primary, Topologies all, Kind kind, Ballot ballot,
                                  TxnId txnId, @Nullable Txn txn, FullRoute<?> 
route, Timestamp executeAt, @Nullable Deps deps,
                                  Callback<ReadReply> callback)
     {
-        for (Node.Id to : primary.nodes())
-        {
-            boolean registerCallback = shouldRegisterCallback.test(param, to);
-            // if we register a callback, supply the provided readScope (which 
may be null)
-            Participants<?> readScope = registerCallback ? readScopeIfCallback 
: null;
-            Commit send = new Commit(kind, to, coordinates, all, txnId, txn, 
route, ballot, executeAt, deps, readScope);
-            if (registerCallback) node.send(to, send, callback);
-            else node.send(to, send);
-        }
-        if (all.size() > 1)
+        for (Node.Id to : contact)
         {
-            for (Node.Id to : all.nodes())
+            if (all.size() == 1 || primary.contains(to))
+            {
+                boolean registerCallback = shouldRegisterCallback.test(param, 
to);
+                // if we register a callback, supply the provided readScope 
(which may be null)
+                Participants<?> readScope = registerCallback ? 
readScopeIfCallback : null;
+                Commit send = new Commit(kind, to, coordinates, all, txnId, 
txn, route, ballot, executeAt, deps, readScope);
+                if (registerCallback) node.send(to, send, callback);
+                else node.send(to, send);
+            }
+            else
             {
-                if (!primary.contains(to))
-                {
-                    boolean registerCallback = 
shouldRegisterCallback.test(param, to);
-                    Commit send = new Commit(kind, to, coordinates, all, 
txnId, txn, route, ballot, executeAt, deps, (ReadTxnData) null);
-                    if (registerCallback) node.send(to, send, callback);
-                    else node.send(to, send);
-                }
+                boolean registerCallback = shouldRegisterCallback.test(param, 
to);
+                Commit send = new Commit(kind, to, coordinates, all, txnId, 
txn, route, ballot, executeAt, deps, (ReadTxnData) null);
+                if (registerCallback) node.send(to, send, callback);
+                else node.send(to, send);
             }
         }
     }
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java 
b/accord-core/src/main/java/accord/messages/ReadData.java
index 5d75f0af..d403f48e 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -60,7 +60,7 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
     private static final Logger logger = 
LoggerFactory.getLogger(ReadData.class);
 
     private enum State { PENDING, RETURNED, OBSOLETE }
-    protected enum Action { WAIT, EXECUTE, OBSOLETE }
+    protected enum StoreAction { WAIT, EXECUTE, OBSOLETE }
 
     public enum ReadType
     {
@@ -165,108 +165,120 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
 
 
     @Override
-    public synchronized CommitOrReadNack apply(SafeCommandStore safeStore)
+    public CommitOrReadNack apply(SafeCommandStore safeStore)
     {
         StoreParticipants participants = StoreParticipants.execute(safeStore, 
readScope, txnId, executeAtEpoch);
         SafeCommand safeCommand = safeStore.get(txnId, participants);
         return apply(safeStore, safeCommand, participants);
     }
 
-    protected synchronized CommitOrReadNack apply(SafeCommandStore safeStore, 
SafeCommand safeCommand, StoreParticipants participants)
+    protected CommitOrReadNack apply(SafeCommandStore safeStore, SafeCommand 
safeCommand, StoreParticipants participants)
     {
-        if (state != State.PENDING)
-            return null;
-
-        Command command = safeCommand.current();
-        SaveStatus status = command.saveStatus();
-        int storeId = safeStore.commandStore().id();
-
-        logger.trace("{}: setting up read with status {} on {}", txnId, 
status, safeStore);
-        switch (actionForStatus(status))
+        synchronized (this)
         {
-            default: throw new AssertionError();
-            case WAIT:
-                registrations.put(storeId, safeStore.register(txnId, this));
-                waitingOn.set(storeId);
-                ++waitingOnCount;
+            if (state != State.PENDING)
+                return null;
 
-                int c = status.compareTo(SaveStatus.Stable);
-                if (c < 0) safeStore.progressLog().waiting(HasStableDeps, 
safeStore, safeCommand, participants.route(), participants.owns());
-                else if (c > 0 && status.compareTo(executeOn().min) >= 0 && 
status.compareTo(SaveStatus.PreApplied) < 0) 
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, 
readScope);
-                return status.compareTo(SaveStatus.Stable) >= 0 ? null : 
Insufficient;
+            Command command = safeCommand.current();
+            SaveStatus status = command.saveStatus();
+            int storeId = safeStore.commandStore().id();
 
-            case OBSOLETE:
-                state = State.OBSOLETE;
-                return Redundant;
-
-            case EXECUTE:
-                registrations.put(storeId, safeStore.register(txnId, this));
-                waitingOn.set(storeId);
-                ++waitingOnCount;
-                reading.set(storeId);
-                read(safeStore, safeCommand.current());
-                return null;
+            logger.trace("{}: setting up read with status {} on {}", txnId, 
status, safeStore);
+            switch (actionForStatus(status))
+            {
+                default: throw new AssertionError();
+                case WAIT:
+                    registrations.put(storeId, safeStore.register(txnId, 
this));
+                    waitingOn.set(storeId);
+                    ++waitingOnCount;
+
+                    int c = status.compareTo(SaveStatus.Stable);
+                    if (c < 0) safeStore.progressLog().waiting(HasStableDeps, 
safeStore, safeCommand, participants.route(), participants.owns());
+                    else if (c > 0 && status.compareTo(executeOn().min) >= 0 
&& status.compareTo(SaveStatus.PreApplied) < 0) 
safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, 
readScope);
+                    return status.compareTo(SaveStatus.Stable) >= 0 ? null : 
Insufficient;
+
+                case OBSOLETE:
+                    state = State.OBSOLETE;
+                    return Redundant;
+
+                case EXECUTE:
+                    registrations.put(storeId, safeStore.register(txnId, 
this));
+                    waitingOn.set(storeId);
+                    ++waitingOnCount;
+                    reading.set(storeId);
+            }
         }
+
+        read(safeStore, safeCommand.current());
+        return null;
     }
 
-    protected final Action actionForStatus(SaveStatus status)
+    protected final StoreAction actionForStatus(SaveStatus status)
     {
         ExecuteOn executeOn = executeOn();
-        if (status.compareTo(executeOn.min) < 0) return Action.WAIT;
-        if (status.compareTo(executeOn.max) > 0) return Action.OBSOLETE;
-        return Action.EXECUTE;
+        if (status.compareTo(executeOn.min) < 0) return StoreAction.WAIT;
+        if (status.compareTo(executeOn.max) > 0) return StoreAction.OBSOLETE;
+        return StoreAction.EXECUTE;
     }
 
     @Override
-    public synchronized boolean notify(SafeCommandStore safeStore, SafeCommand 
safeCommand)
+    public boolean notify(SafeCommandStore safeStore, SafeCommand safeCommand)
     {
         Command command = safeCommand.current();
         logger.trace("{}: updating as listener in response to change on {} 
with status {} ({})",
                      this, command.txnId(), command.status(), command);
 
-        int storeId = safeStore.commandStore().id();
-        if (state != State.PENDING)
-            return false;
-
-        switch (actionForStatus(command.saveStatus()))
+        boolean execute;
+        synchronized (this)
         {
-            default: throw new AssertionError("Unhandled Action: " + 
actionForStatus(command.saveStatus()));
-            case WAIT:
-                return true;
-
-            case OBSOLETE:
-                onFailure(Redundant, null);
+            int storeId = safeStore.commandStore().id();
+            if (state != State.PENDING)
                 return false;
 
-            case EXECUTE:
-                if (!reading.get(storeId))
-                {
+            switch (actionForStatus(command.saveStatus()))
+            {
+                default: throw new AssertionError("Unhandled Action: " + 
actionForStatus(command.saveStatus()));
+                case WAIT:
+                    return true;
+
+                case OBSOLETE:
+                    execute = false;
+                    break;
+
+                case EXECUTE:
+                    if (reading.get(storeId))
+                        return true;
+
                     if (!waitingOn.get(storeId))
                     {
                         waitingOn.set(storeId);
                         ++waitingOnCount;
                     }
                     reading.set(storeId);
-                    logger.trace("{}: executing read", command.txnId());
-                    read(safeStore, command);
-                }
-                return true;
+                    execute = true;
+            }
+        }
+
+        if (execute)
+        {
+            logger.trace("{}: executing read", command.txnId());
+            read(safeStore, command);
+            return true;
+        }
+        else
+        {
+            onFailure(Redundant, null);
+            return false;
         }
     }
 
     @Override
-    public synchronized void accept(CommitOrReadNack reply, Throwable failure)
+    public void accept(CommitOrReadNack reply, Throwable failure)
     {
         // Unless failed always ack to indicate setup has completed otherwise 
the counter never gets to -1
         if ((reply == null || !reply.isFinal()) && failure == null)
         {
-            onOneSuccess(-1, null);
-            if (state == State.PENDING)
-            {
-                // Time out reads to avoid doing extra work for requests that 
will have responses ignored
-                long replyTimeout = node.agent().replyTimeout(replyContext, 
MILLISECONDS);
-                timeout = node.requestTimeouts().register(this, replyTimeout, 
MILLISECONDS);
-            }
+            onOneSuccess(-1, null, true);
             if (reply != null)
                 node.reply(replyTo, replyContext, reply, null);
         }
@@ -307,21 +319,33 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
         });
     }
 
-    protected synchronized void readComplete(CommandStore commandStore, 
@Nullable Data result, @Nullable Ranges unavailable)
+    protected void readComplete(CommandStore commandStore, @Nullable Data 
result, @Nullable Ranges unavailable)
     {
-        if (state == State.OBSOLETE)
-            return;
+        LocalListeners.Registered cancel;
+        Runnable clear;
+        synchronized(this)
+        {
+            if (state == State.OBSOLETE)
+                return;
 
-        logger.trace("{}: read completed on {}", txnId, commandStore);
-        if (result != null)
-            data = data == null ? result : data.merge(result);
+            logger.trace("{}: read completed on {}", txnId, commandStore);
+            if (result != null)
+                data = data == null ? result : data.merge(result);
 
-        int storeId = commandStore.id();
-        registrations.remove(storeId).cancel();
-        onOneSuccess(storeId, unavailable);
+            int storeId = commandStore.id();
+            cancel = registrations.remove(storeId);
+            clear = onOneSuccessInternal(storeId, unavailable, false);
+        }
+        cancel.cancel();
+        cleanup(clear);
     }
 
-    protected void onOneSuccess(int storeId, @Nullable Ranges newUnavailable)
+    protected void onOneSuccess(int storeId, @Nullable Ranges newUnavailable, 
boolean registration)
+    {
+        cleanup(onOneSuccessInternal(storeId, newUnavailable, registration));
+    }
+
+    protected synchronized Runnable onOneSuccessInternal(int storeId, 
@Nullable Ranges newUnavailable, boolean registration)
     {
         if (storeId >= 0)
         {
@@ -339,56 +363,93 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
         // wait for -1 to ensure the setup phase has also completed. Setup 
calls ack in its callback
         // and prevents races where we respond before dispatching all the 
required reads (if the reads are
         // completing faster than the reads can be setup on all required 
shards)
-        if (-1 == --waitingOnCount)
-            onAllSuccess(this.unavailable, data, null);
-    }
+        if (--waitingOnCount >= 0)
+        {
+            if (registration)
+            {
+                // Time out reads to avoid doing extra work for requests that 
will have responses ignored
+                long replyTimeout = node.agent().replyTimeout(replyContext, 
MILLISECONDS);
+                timeout = node.requestTimeouts().register(this, replyTimeout, 
MILLISECONDS);
+            }
+            return null;
+        }
 
-    protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data 
data, @Nullable Throwable fail)
-    {
         switch (state)
         {
+            default:
+                throw new AssertionError("Unknown state: " + state);
+
             case RETURNED:
                 throw illegalState("ReadOk was sent, yet ack called again");
 
             case OBSOLETE:
-                logger.debug("After the read completed for txn {}, the result 
was marked obsolete", txnId);
-                if (fail != null)
-                    node.agent().onUncaughtException(fail);
-                break;
+                logger.debug("Before the read completed successfully for txn 
{}, the result was marked obsolete", txnId);
+                return null;
 
             case PENDING:
                 state = State.RETURNED;
-                node.reply(replyTo, replyContext, fail == null ? 
constructReadOk(unavailable, data) : null, fail);
-                clear();
-                break;
+                node.reply(replyTo, replyContext, constructReadOk(unavailable, 
data), null);
+                return clearUnsafe();
+        }
+    }
 
-            default:
-                throw new AssertionError("Unknown state: " + state);
+    boolean cancel()
+    {
+        Runnable clear;
+        synchronized (this)
+        {
+            if (state != State.PENDING)
+                return false;
+
+            clear = clearUnsafe();
+            state = State.OBSOLETE;
+        }
+        cleanup(clear);
+        return true;
+    }
+
+    void cleanup()
+    {
+        Runnable clear;
+        synchronized (this)
+        {
+            clear = clearUnsafe();
         }
+        cleanup(clear);
     }
 
-    void cancel()
+    private static void cleanup(@Nullable Runnable clear)
     {
-        state = State.OBSOLETE;
-        clear();
+        if (clear != null)
+            clear.run();
     }
 
-    void clear()
+    @Nullable Runnable clearUnsafe()
     {
-        if (timeout != null)
-            timeout.cancel();
-        registrations.forEach((i, r) -> r.cancel());
+        RegisteredTimeout cancelTimeout = timeout;
+        Int2ObjectHashMap<LocalListeners.Registered> cancelRegistrations = 
registrations;
+        timeout = null;
+        registrations = null;
         waitingOn.clear();
         reading.clear();
         data = null;
         unavailable = null;
-        timeout = null;
+
+        if (cancelRegistrations == null && cancelTimeout == null)
+            return null;
+
+        return () -> {
+            if (cancelTimeout != null)
+                cancelTimeout.cancel();
+            if (cancelRegistrations != null)
+                cancelRegistrations.forEach((i, r) -> r.cancel());
+        };
     }
 
-    synchronized public void timeout()
+    public void timeout()
     {
         timeout = null;
-        cancel();
+        cleanup();
     }
 
     public int stripe()
@@ -396,9 +457,11 @@ public abstract class ReadData extends 
AbstractEpochRequest<ReadData.CommitOrRea
         return txnId.hashCode();
     }
 
-    synchronized void onFailure(CommitOrReadNack failReply, Throwable 
throwable)
+    void onFailure(CommitOrReadNack failReply, Throwable throwable)
     {
-        cancel();
+        if (!cancel())
+            return;
+
         if (throwable != null)
         {
             node.reply(replyTo, replyContext, null, throwable);
diff --git a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java 
b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
index fe4e210d..2d2e1d96 100644
--- a/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
+++ b/accord-core/src/main/java/accord/messages/WaitUntilApplied.java
@@ -74,7 +74,7 @@ public class WaitUntilApplied extends ReadData
         long retryInLaterEpoch = 
ReadEphemeralTxnData.retryInLaterEpoch(executeAtEpoch, safeStore, command);
         if (retryInLaterEpoch > this.retryInLaterEpoch)
             this.retryInLaterEpoch = retryInLaterEpoch;
-        onOneSuccess(safeStore.commandStore().id(), unavailable(safeStore, 
command));
+        onOneSuccess(safeStore.commandStore().id(), unavailable(safeStore, 
command), false);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index af74ba09..da310314 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -872,6 +872,11 @@ public class TopologyManager
         return epochs.get(epoch) != null;
     }
 
+    public boolean hasAtLeastEpoch(long epoch)
+    {
+        return epochs.currentEpoch >= epoch;
+    }
+
     public Topology localForEpoch(long epoch)
     {
         EpochState epochState = epochs.get(epoch);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to