This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cf101690 CEP-15: (Accord) Bootstrap's LocalOnly txn can not be 
recreated from SerializerSupport (#93)
cf101690 is described below

commit cf10169067a8cd40fb876789a62439cc03fd2e9b
Author: dcapwell <[email protected]>
AuthorDate: Mon Jun 17 12:59:35 2024 -0700

    CEP-15: (Accord) Bootstrap's LocalOnly txn can not be recreated from 
SerializerSupport (#93)
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-19674
---
 .../src/main/java/accord/local/Bootstrap.java      |   1 +
 .../src/main/java/accord/local/Cleanup.java        |   2 +-
 accord-core/src/main/java/accord/local/Node.java   |  18 ++-
 .../main/java/accord/local/SerializerSupport.java  |  60 ++++++-
 .../main/java/accord/messages/LocalRequest.java    |  25 +--
 .../src/main/java/accord/messages/Propagate.java   |  45 ++----
 .../src/main/java/accord/primitives/SyncPoint.java |  26 +++
 accord-core/src/test/java/accord/Utils.java        |   2 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  82 ++++++++++
 .../accord/impl/basic/DelayedCommandStores.java    |  67 ++++++--
 .../src/test/java/accord/impl/basic/Journal.java   |  15 +-
 .../src/test/java/accord/impl/list/ListQuery.java  |  16 ++
 .../test/java/accord/impl/mock/MockCluster.java    |   2 +-
 .../java/accord/local/BootstrapLocalTxnTest.java   | 133 ++++++++++++++++
 .../src/test/java/accord/local/CommandsTest.java   | 103 +-----------
 .../src/test/java/accord/utils/AccordGens.java     |  23 +++
 .../java/accord/utils/LazyToString.java}           |  27 ++--
 .../test/java/accord/utils/ReflectionUtils.java    | 177 +++++++++++++++++++++
 .../src/main/java/accord/maelstrom/Cluster.java    |   2 +-
 .../src/main/java/accord/maelstrom/Main.java       |   2 +-
 20 files changed, 635 insertions(+), 193 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index e478eb21..87fd16ff 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -120,6 +120,7 @@ class Bootstrap
         {
             globalSyncId = node.nextTxnId(ExclusiveSyncPoint, 
Routable.Domain.Range);
             localSyncId = globalSyncId.as(LocalOnly).withEpoch(epoch);
+            Invariants.checkArgument(epoch <= globalSyncId.epoch(), 
"Attempting to use local epoch %d which is larger than global epoch %d", epoch, 
globalSyncId.epoch());
 
             if (!node.topology().hasEpoch(globalSyncId.epoch()))
             {
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index c38de8c2..bb6b1d2e 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -105,7 +105,7 @@ public enum Cleanup
         if (txnId.kind() == EphemeralRead)
             return Cleanup.NO; // TODO (required): clean-up based on timeout
 
-        if (durableBefore.min(txnId) == Universal)
+        if (durableBefore.min(txnId) == UniversalOrInvalidated)
         {
             if (status.hasBeen(PreCommitted) && !status.hasBeen(Applied)) // 
TODO (expected): may be stale
                 illegalState("Loading universally-durable command that has 
been PreCommitted but not Applied");
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index ced53157..26ffd317 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -89,6 +89,7 @@ import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncExecutor;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
@@ -532,9 +533,21 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         messageSink.send(to, send);
     }
 
-    public void localRequest(LocalRequest message)
+    public <R> void localRequest(LocalRequest<R> message, BiConsumer<? super 
R, Throwable> callback)
     {
-        localRequestHandler.handle(message, this);
+        localRequestHandler.handle(message, callback, this);
+    }
+
+    public <R> AsyncChain<R> localRequest(LocalRequest<R> message)
+    {
+        return new AsyncChains.Head<R>()
+        {
+            @Override
+            protected void start(BiConsumer<? super R, Throwable> callback)
+            {
+                localRequest(message, callback);
+            }
+        };
     }
 
     public void reply(Id replyingToNode, ReplyContext replyContext, Reply 
send, Throwable failure)
@@ -602,6 +615,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
 
     public FullRoute<?> computeRoute(long epoch, Seekables<?, ?> keysOrRanges)
     {
+        Invariants.checkArgument(!keysOrRanges.isEmpty(), "Attempted to 
compute a route from empty keys or ranges");
         RoutingKey homeKey = trySelectHomeKey(epoch, keysOrRanges);
         if (homeKey == null)
             homeKey = selectRandomHomeKey(epoch);
diff --git a/accord-core/src/main/java/accord/local/SerializerSupport.java 
b/accord-core/src/main/java/accord/local/SerializerSupport.java
index 5384a7e3..0d9ec3b5 100644
--- a/accord-core/src/main/java/accord/local/SerializerSupport.java
+++ b/accord-core/src/main/java/accord/local/SerializerSupport.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableSet;
 
+import accord.api.Agent;
 import accord.api.Result;
 import accord.api.VisibleForImplementation;
 import accord.local.Command.WaitingOn;
@@ -37,10 +38,13 @@ import accord.messages.MessageType;
 import accord.messages.PreAccept;
 import accord.messages.Propagate;
 import accord.primitives.Ballot;
+import accord.primitives.Deps;
+import accord.primitives.FullRangeRoute;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
@@ -90,11 +94,63 @@ public class SerializerSupport
                 .addAll(APPLY_TYPES)
                 .build();
 
+    private static Command localOnly(Agent agent, RangesForEpoch 
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, 
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
+    {
+        //TODO (expected): LocalOnly doesn't reflect normal command flow so 
relying on this special casing helps reconstruct work, but is a bit brittle; 
should find a more maintainable way.
+        TxnId txnId = attrs.txnId();
+        FullRangeRoute route = (FullRangeRoute) attrs.route();
+        //TODO (correctness): not 100% correct as the ranges was "valid" which 
can be mutated "after" the sync point... so might actually have less
+        Ranges participantRanges = route.participants().toRanges();
+
+        Txn emptyTxn = agent.emptyTxn(txnId.kind(), participantRanges);
+        Writes writes = emptyTxn.execute(txnId, txnId, null);
+        Result results = emptyTxn.result(txnId, txnId, null);
+        Ranges coordinateRanges = rangesForEpoch.coordinates(txnId);
+        attrs.partialTxn(emptyTxn.slice(coordinateRanges, true))
+             .partialDeps(Deps.NONE.slice(coordinateRanges));
+
+        switch (status.status)
+        {
+            case NotDefined:
+                return status == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(txnId)
+                                                          : 
Command.NotDefined.notDefined(attrs, promised);
+            case PreAccepted:
+                return Command.PreAccepted.preAccepted(attrs, executeAt, 
promised);
+            case AcceptedInvalidate:
+            case Accepted:
+            case PreCommitted:
+                return Command.Accepted.accepted(attrs, status, executeAt, 
promised, accepted);
+            case Committed:
+            case Stable:
+                return Command.Committed.committed(attrs, status, executeAt, 
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()));
+            case PreApplied:
+            case Applied:
+                return Command.Executed.executed(attrs, status, executeAt, 
promised, accepted, waitingOnProvider.provide(attrs.partialDeps()), writes, 
results);
+            case Truncated:
+            case Invalidated:
+                switch (status)
+                {
+                    case Erased:
+                        return Command.Truncated.erased(txnId, 
attrs.durability(), attrs.route());
+                    case TruncatedApplyWithOutcome:
+                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, writes, results, executesAtLeast);
+                    case TruncatedApply:
+                        return Command.Truncated.truncatedApply(attrs, status, 
executeAt, null, null, executesAtLeast);
+                    default:
+                        throw new AssertionError("Unexpected truncate status: 
" + status);
+                }
+            default:
+                throw new IllegalStateException();
+        }
+    }
+
     /**
      * Reconstructs Command from register values and protocol messages.
      */
-    public static Command reconstruct(RangesForEpoch rangesForEpoch, Mutable 
attrs, SaveStatus status, Timestamp executeAt, @Nullable Timestamp 
executesAtLeast, Ballot promised, Ballot accepted, WaitingOnProvider 
waitingOnProvider, MessageProvider messageProvider)
+    public static Command reconstruct(Agent agent, RangesForEpoch 
rangesForEpoch, Mutable attrs, SaveStatus status, Timestamp executeAt, 
@Nullable Timestamp executesAtLeast, Ballot promised, Ballot accepted, 
WaitingOnProvider waitingOnProvider, MessageProvider messageProvider)
     {
+        if (attrs.txnId().kind() == Txn.Kind.LocalOnly)
+            return localOnly(agent, rangesForEpoch, attrs, status, executeAt, 
executesAtLeast, promised, accepted, waitingOnProvider, messageProvider);
         switch (status.status)
         {
             case NotDefined:
@@ -308,7 +364,7 @@ public class SerializerSupport
                 }
                 else
                 {
-                    checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), 
"Unable to find STABLE_SLOW_PATH_REQ; witnessed %s", new 
LoggedMessageProvider(messageProvider));
+                    checkState(witnessed.contains(STABLE_SLOW_PATH_REQ), 
"Unable to find STABLE_SLOW_PATH_REQ; txn_id=%s, witnessed %s", 
messageProvider.txnId(), new LoggedMessageProvider(messageProvider));
                     if (witnessed.contains(COMMIT_MAXIMAL_REQ))
                     {
                         commit = messageProvider.commitMaximal();
diff --git a/accord-core/src/main/java/accord/messages/LocalRequest.java 
b/accord-core/src/main/java/accord/messages/LocalRequest.java
index d12e35a9..2775e9ac 100644
--- a/accord-core/src/main/java/accord/messages/LocalRequest.java
+++ b/accord-core/src/main/java/accord/messages/LocalRequest.java
@@ -18,25 +18,26 @@
 package accord.messages;
 
 import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
+import accord.primitives.TxnId;
 
 import java.util.function.BiConsumer;
+import javax.annotation.Nullable;
 
-public interface LocalRequest<R> extends Request, PreLoadContext, 
MapReduceConsume<SafeCommandStore, Void>
+public interface LocalRequest<R> extends Message
 {
-    /**
-     * Process the request without executing the callback
-     */
-    void process(Node on);
+    default long waitForEpoch() { return 0; }
+    @Nullable
+    TxnId primaryTxnId();
 
-    void process(Node on, BiConsumer<R, Throwable> callback);
-
-    BiConsumer<R, Throwable> callback();
+    void process(Node on, BiConsumer<? super R, Throwable> callback);
 
     interface Handler
     {
-        void handle(LocalRequest<?> message, Node node);
+        <R> void handle(LocalRequest<R> message, BiConsumer<? super R, 
Throwable> callback, Node node);
+    }
+
+    static <R> void simpleHandler(LocalRequest<R> message, BiConsumer<? super 
R, Throwable> callback, Node node)
+    {
+        message.process(node, callback);
     }
 }
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java 
b/accord-core/src/main/java/accord/messages/Propagate.java
index c67d5fa2..a6d209e0 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -24,6 +24,7 @@ import accord.local.Command;
 import accord.local.Commands;
 import accord.local.KeyHistory;
 import accord.local.Node;
+import accord.local.PreLoadContext;
 import accord.local.SafeCommand;
 import accord.local.SafeCommandStore;
 import accord.local.SaveStatus;
@@ -43,6 +44,7 @@ import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
+import accord.utils.MapReduceConsume;
 
 import javax.annotation.Nullable;
 import java.util.function.BiConsumer;
@@ -59,13 +61,13 @@ import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.illegalState;
 
 // TODO (required): detect propagate loops where we don't manage to update 
anything but should
-public class Propagate implements EpochSupplier, LocalRequest<Status.Known>
+public class Propagate implements EpochSupplier, LocalRequest<Status.Known>, 
PreLoadContext, MapReduceConsume<SafeCommandStore, Void>
 {
     public static class SerializerSupport
     {
         public static Propagate create(TxnId txnId, Route<?> route, SaveStatus 
maxKnowledgeSaveStatus, SaveStatus maxSaveStatus, Ballot ballot, 
Status.Durability durability, RoutingKey homeKey, RoutingKey progressKey, 
Status.Known achieved, FoundKnownMap known, boolean isTruncated, PartialTxn 
partialTxn, PartialDeps committedDeps, long toEpoch, Timestamp executeAt, 
Writes writes, Result result)
         {
-            return new Propagate(txnId, route, maxKnowledgeSaveStatus, 
maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known, 
isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result, 
null);
+            return new Propagate(txnId, route, maxKnowledgeSaveStatus, 
maxSaveStatus, ballot, durability, homeKey, progressKey, achieved, known, 
isTruncated, partialTxn, committedDeps, toEpoch, executeAt, writes, result);
         }
     }
 
@@ -89,14 +91,7 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
     @Nullable public final Timestamp committedExecuteAt;
     @Nullable public final Writes writes;
     @Nullable public final Result result;
-
-    protected transient BiConsumer<Status.Known, Throwable> callback;
-
-    @Override
-    public BiConsumer<Status.Known, Throwable> callback()
-    {
-        return callback;
-    }
+    protected transient BiConsumer<? super Status.Known, Throwable> callback;
 
     Propagate(
         TxnId txnId,
@@ -115,8 +110,7 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
         long toEpoch,
         @Nullable Timestamp committedExecuteAt,
         @Nullable Writes writes,
-        @Nullable Result result,
-        BiConsumer<Status.Known, Throwable> callback)
+        @Nullable Result result)
     {
         this.txnId = txnId;
         this.route = route;
@@ -135,14 +129,13 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
         this.committedExecuteAt = committedExecuteAt;
         this.writes = writes;
         this.result = result;
-        this.callback = callback;
     }
 
     @Override
-    public void process(Node on, BiConsumer<Status.Known, Throwable> callback)
+    public void process(Node on, BiConsumer<? super Status.Known, Throwable> 
callback)
     {
         this.callback = callback;
-        process(on);
+        on.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -213,9 +206,9 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
             stableDeps = 
full.stableDeps.slice(sliceRanges).reconstitutePartial(covering);
 
         Propagate propagate =
-            new Propagate(txnId, route, full.maxKnowledgeSaveStatus, 
full.maxSaveStatus, full.acceptedOrCommitted, full.durability, full.homeKey, 
progressKey, achieved, full.map, isShardTruncated, partialTxn, stableDeps, 
toEpoch, full.executeAtIfKnown(), full.writes, full.result, callback);
+            new Propagate(txnId, route, full.maxKnowledgeSaveStatus, 
full.maxSaveStatus, full.acceptedOrCommitted, full.durability, full.homeKey, 
progressKey, achieved, full.map, isShardTruncated, partialTxn, stableDeps, 
toEpoch, full.executeAtIfKnown(), full.writes, full.result);
 
-        node.localRequest(propagate);
+        node.localRequest(propagate, callback);
     }
 
     @Override
@@ -243,24 +236,6 @@ public class Propagate implements EpochSupplier, 
LocalRequest<Status.Known>
         return KeyHistory.COMMANDS;
     }
 
-    @Override
-    public void preProcess(Node on, Node.Id from, ReplyContext replyContext)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void process(Node on, Node.Id from, ReplyContext replyContext)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void process(Node node)
-    {
-        node.mapReduceConsumeLocal(this, route, txnId.epoch(), toEpoch, this);
-    }
-
     @Override
     public Void apply(SafeCommandStore safeStore)
     {
diff --git a/accord-core/src/main/java/accord/primitives/SyncPoint.java 
b/accord-core/src/main/java/accord/primitives/SyncPoint.java
index d22f3756..afc44165 100644
--- a/accord-core/src/main/java/accord/primitives/SyncPoint.java
+++ b/accord-core/src/main/java/accord/primitives/SyncPoint.java
@@ -72,4 +72,30 @@ public class SyncPoint<S extends Seekables<?, ?>>
         TxnId maxDep = waitFor.maxTxnId();
         return TxnId.nonNullOrMax(maxDep, syncId).epoch();
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SyncPoint<?> syncPoint = (SyncPoint<?>) o;
+        return syncId.equals(syncPoint.syncId) && 
waitFor.equals(syncPoint.waitFor) && 
keysOrRanges.equals(syncPoint.keysOrRanges) && 
homeKey.equals(syncPoint.homeKey);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SyncPoint{" +
+               "syncId=" + syncId +
+               ", waitFor=" + waitFor +
+               ", keysOrRanges=" + keysOrRanges +
+               ", homeKey=" + homeKey +
+               '}';
+    }
 }
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 78b12988..680b47d1 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -163,7 +163,7 @@ public class Utils
         LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(nodeId,
                              messageSink,
-                             LocalRequest::process,
+                             LocalRequest::simpleHandler,
                              new MockConfigurationService(messageSink, 
EpochFunction.noop(), topology),
                              clock,
                              
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, clock),
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index da5851f8..0ac20cb3 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -22,12 +22,15 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.BooleanSupplier;
@@ -61,6 +64,7 @@ import accord.impl.PrefixedIntHashKey;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TopologyFactory;
+import accord.impl.list.ListAgent;
 import accord.impl.list.ListStore;
 import accord.local.AgentExecutor;
 import accord.local.Node.Id;
@@ -274,6 +278,84 @@ public class Cluster implements Scheduler
         run.run();
     }
 
+    public static Map<MessageType, Cluster.Stats> run(Supplier<RandomSource> 
randomSupplier,
+                                                      List<Node.Id> nodes,
+                                                      Topology initialTopology,
+                                                      Function<Map<Node.Id, 
Node>, Request> init)
+    {
+        List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
+        PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, 
new RandomDelayQueue(randomSupplier.get()));
+        RandomSource retryRandom = randomSupplier.get();
+        Consumer<Runnable> retryBootstrap = retry -> {
+            long delay = retryRandom.nextInt(1, 15);
+            queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS);
+        };
+        Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = 
onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale);
+        RandomSource nowRandom = randomSupplier.get();
+        Supplier<LongSupplier> nowSupplier = () -> {
+            RandomSource forked = nowRandom.fork();
+            // TODO (now): meta-randomise scale of clock drift
+            return FrequentLargeRange.builder(forked)
+                                     .ratio(1, 5)
+                                     .small(50, 5000, TimeUnit.MICROSECONDS)
+                                     .large(1, 10, TimeUnit.MILLISECONDS)
+                                     .build()
+                                     .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
+                                     .asLongSupplier(forked);
+        };
+        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, 
retryBootstrap, (i1, i2) -> {
+            throw new IllegalAccessError("Global executor should enver get a 
stale event");
+        }));
+        TopologyFactory topologyFactory = new 
TopologyFactory(initialTopology.maxRf(), 
initialTopology.ranges().stream().toArray(Range[]::new))
+        {
+            @Override
+            public Topology toTopology(Node.Id[] cluster)
+            {
+                return initialTopology;
+            }
+        };
+        AtomicInteger counter = new AtomicInteger();
+        AtomicReference<Map<Id, Node>> nodeMap = new AtomicReference<>();
+        Map<MessageType, Cluster.Stats> stats = 
Cluster.run(nodes.toArray(Node.Id[]::new),
+                                                            
MessageListener.get(),
+                                                            () -> queue,
+                                                            (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
+                                                            
queue::checkFailures,
+                                                            ignore -> {
+                                                            },
+                                                            randomSupplier,
+                                                            nowSupplier,
+                                                            topologyFactory,
+                                                            new Supplier<>()
+                                                            {
+                                                                private 
Iterator<Request> requestIterator = null;
+                                                                private final 
RandomSource rs = randomSupplier.get();
+                                                                @Override
+                                                                public Packet 
get()
+                                                                {
+                                                                    // 
((Cluster) node.scheduler()).onDone(() -> checkOnResult(homeKey, txnId, 0, 
null));
+                                                                    if 
(requestIterator == null)
+                                                                    {
+                                                                        
Map<Node.Id, Node> nodes = nodeMap.get();
+                                                                        
requestIterator = Collections.singleton(init.apply(nodes)).iterator();
+                                                                    }
+                                                                    if 
(!requestIterator.hasNext())
+                                                                        return 
null;
+                                                                    Node.Id id 
= rs.pick(nodes);
+                                                                    return new 
Packet(id, id, counter.incrementAndGet(), requestIterator.next());
+                                                                }
+                                                            },
+                                                            Runnable::run,
+                                                            nodeMap::set);
+        if (!failures.isEmpty())
+        {
+            AssertionError error = new AssertionError("Unexpected errors 
detected");
+            failures.forEach(error::addSuppressed);
+            throw error;
+        }
+        return stats;
+    }
+
     public static Map<MessageType, Stats> run(Id[] nodes, MessageListener 
messageListener, Supplier<PendingQueue> queueSupplier,
                                               BiFunction<Id, 
BiConsumer<Timestamp, Ranges>, AgentExecutor> nodeExecutorSupplier,
                                               Runnable checkFailures, 
Consumer<Packet> responseSink,
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 7998b6ba..5ba32268 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -19,14 +19,17 @@
 package accord.impl.basic;
 
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.function.BiConsumer;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import accord.api.Agent;
 import accord.api.DataStore;
@@ -54,12 +57,53 @@ import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topology;
 import accord.utils.Invariants;
+import accord.utils.LazyToString;
 import accord.utils.RandomSource;
+import accord.utils.ReflectionUtils;
+import accord.utils.ReflectionUtils.Difference;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 
 public class DelayedCommandStores extends InMemoryCommandStores.SingleThread
 {
+    //TODO (correctness): remove once we have a Journal integration that does 
not change the values of a command...
+    // There are known issues due to Journal, so exclude known problamtic 
fields
+    private static class ValidationHack
+    {
+        private static Pattern field(String path)
+        {
+            path = path.replace(".", "\\.");
+            path += ".*";
+            return Pattern.compile(path);
+        }
+
+        private static final List<Pattern> KNOWN_ISSUES = 
List.of(field(".partialTxn.keys."),
+                                                                  
field(".partialTxn.read.keys."),
+                                                                  
field(".partialTxn.read.userReadKeys."),
+                                                                  
field(".partialTxn.covering."),
+                                                                  
field(".partialDeps.keyDeps."),
+                                                                  
field(".partialDeps.rangeDeps."),
+                                                                  
field(".partialDeps.covering."),
+                                                                  
field(".additionalKeysOrRanges."),
+                                                                  // when a 
new epoch is detected and the execute ranges have more than the coordinating 
ranges,
+                                                                  // and the 
coordinating ranges doesn't include the home key... we drop the query...
+                                                                  // The logic 
to stitch messages together is not able to handle this as it doesn't know the 
original Topologies
+                                                                  
field(".partialTxn.query."),
+                                                                  // 
cmd.mutable().build() != cmd.  This is due to Command.durability changing 
NotDurable to Local depending on the status
+                                                                  
field(".durability."));
+    }
+
+    private static boolean hasKnownIssue(String path)
+    {
+        for (Pattern p : ValidationHack.KNOWN_ISSUES)
+        {
+            Matcher m = p.matcher(path);
+            if (m.find())
+                return true;
+        }
+        return false;
+    }
+
     private DelayedCommandStores(NodeTimeService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, SimulatedDelayedExecutorService 
executorService, BooleanSupplier isLoadedCheck, Journal journal)
     {
         super(time, agent, store, random, shardDistributor, 
progressLogFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, 
journal));
@@ -123,14 +167,11 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         }
 
         @Override
-        protected void validateRead(Command current)
+        public void validateRead(Command current)
         {
             // "loading" the command doesn't make sense as we don't "store" 
the command...
             if (current.txnId().kind() == Txn.Kind.EphemeralRead)
                 return;
-            //TODO (correctness): these type of txn must be durable but 
currently they are not... should make sure this is plugged into the C* journal 
properly for reply
-            if (current.txnId().kind() == Txn.Kind.LocalOnly)
-                return;
             Command.WaitingOn waitingOn = null;
             if (current.isStable() && !current.isTruncated())
                 waitingOn = current.asCommitted().waitingOn;
@@ -138,10 +179,11 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             Command.WaitingOn finalWaitingOn = waitingOn;
             CommonAttributes.Mutable mutable = current.mutable();
             mutable.partialDeps(null).removePartialTxn();
+
             Command reconstructed;
             try
             {
-                reconstructed = 
SerializerSupport.reconstruct(unsafeRangesForEpoch(), mutable, 
current.saveStatus(), current.executeAt(), 
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, 
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, 
messages);
+                reconstructed = SerializerSupport.reconstruct(agent(), 
unsafeRangesForEpoch(), mutable, current.saveStatus(), current.executeAt(), 
current.txnId().kind().awaitsOnlyDeps() ? current.executesAtLeast() : null, 
current.promised(), current.acceptedOrCommitted(), ignore -> finalWaitingOn, 
messages);
             }
             catch (IllegalStateException t)
             {
@@ -152,16 +194,9 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
                     return;
                 throw t;
             }
-            //TODO (correctness): journal doesn't guarantee we pick the same 
records we used to state transition
-            if (current.partialDeps() != null && 
!current.partialDeps().rangeDeps.equals(reconstructed.partialDeps().rangeDeps))
-                return;
-            // for some reasons scope doesn't alaways match, this might be due 
to journal... what sucks is that this can also be a bug in the extract, so its
-            // hard to figure out what happened.
-            if (current.partialDeps() != null && 
!current.partialDeps().equals(reconstructed.partialDeps()))
-                return;
-            if (current.isCommitted() && !current.isTruncated() && 
!Objects.equals(current.asCommitted().waitingOn(), 
reconstructed.asCommitted().waitingOn()))
-                return;
-//            Invariants.checkState(current.equals(reconstructed), "Commands 
did not match: expected %s, given %s", current, reconstructed);
+            List<Difference<?>> diff = 
ReflectionUtils.recursiveEquals(current, reconstructed);
+            List<String> filteredDiff = diff.stream().filter(d -> 
!DelayedCommandStores.hasKnownIssue(d.path)).map(Object::toString).collect(Collectors.toList());
+            Invariants.checkState(filteredDiff.isEmpty(), "Commands did not 
match: expected %s, given %s, node %s, store %d, diff %s", current, 
reconstructed, time, id(), new LazyToString(() -> String.join("\n", 
filteredDiff)));
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/Journal.java
index 7c0217b5..4c5f63e5 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/Journal.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -132,18 +133,18 @@ public class Journal implements LocalRequest.Handler, 
Runnable
     {
         this.node = null;
     }
-    
+
     @Override
-    public void handle(LocalRequest<?> message, Node node)
+    public <R> void handle(LocalRequest<R> message, BiConsumer<? super R, 
Throwable> callback, Node node)
     {
         messageListener.onMessage(NodeSink.Action.DELIVER, node.id(), 
node.id(), -1, message);
         if (message.type().hasSideEffects())
         {
             // enqueue
-            unframedRequests.add(new RequestContext(message, () -> 
node.scheduler().now(() -> message.process(node))));
+            unframedRequests.add(new RequestContext(message, 
message.waitForEpoch(), () -> node.scheduler().now(() -> message.process(node, 
callback))));
             return;
         }
-        message.process(node);
+        message.process(node, callback);
     }
 
     public void handle(Request request, Node.Id from, ReplyContext 
replyContext)
@@ -151,7 +152,7 @@ public class Journal implements LocalRequest.Handler, 
Runnable
         if (request.type() != null && request.type().hasSideEffects())
         {
             // enqueue
-            unframedRequests.add(new RequestContext(request, () -> 
node.receive(request, from, replyContext)));
+            unframedRequests.add(new RequestContext(request, 
request.waitForEpoch(), () -> node.receive(request, from, replyContext)));
             return;
         }
         node.receive(request, from, replyContext);
@@ -286,9 +287,9 @@ public class Journal implements LocalRequest.Handler, 
Runnable
         final Message message;
         final Runnable fn;
 
-        protected RequestContext(Request request, Runnable fn)
+        protected RequestContext(Message request, long waitForEpoch, Runnable 
fn)
         {
-            this.waitForEpoch = request.waitForEpoch();
+            this.waitForEpoch = waitForEpoch;
             this.message = request;
             this.fn = fn;
         }
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java 
b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index db2c7d4c..24488101 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -19,6 +19,7 @@
 package accord.impl.list;
 
 import java.util.Map;
+import java.util.Objects;
 
 import accord.api.Data;
 import accord.api.Key;
@@ -62,4 +63,19 @@ public class ListQuery implements Query
         }
         return new ListResult(ListResult.Status.Applied, client, requestId, 
txnId, read.userReadKeys, responseKeys, values, (ListUpdate) update);
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ListQuery listQuery = (ListQuery) o;
+        return requestId == listQuery.requestId && Objects.equals(client, 
listQuery.client);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index f3b80c53..de8c78f0 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -125,7 +125,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
         LocalConfig localConfig = new MutableLocalConfig();
         Node node = new Node(id,
                              messageSink,
-                             LocalRequest::process,
+                             LocalRequest::simpleHandler,
                              configurationService,
                              nowSupplier,
                              
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, nowSupplier),
diff --git a/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java 
b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
new file mode 100644
index 00000000..ab38eb7f
--- /dev/null
+++ b/accord-core/src/test/java/accord/local/BootstrapLocalTxnTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.local;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Test;
+
+import accord.impl.PrefixedIntHashKey;
+import accord.impl.basic.Cluster;
+import accord.impl.basic.DelayedCommandStores.DelayedCommandStore;
+import accord.messages.MessageType;
+import accord.messages.ReplyContext;
+import accord.messages.Request;
+import accord.primitives.Deps;
+import accord.primitives.FullRoute;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.SyncPoint;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.topology.Topology;
+import accord.topology.TopologyUtils;
+import accord.utils.AccordGens;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import org.assertj.core.api.Assertions;
+
+import static accord.local.PreLoadContext.contextFor;
+import static accord.utils.Property.qt;
+
+class BootstrapLocalTxnTest
+{
+    private static final Gen<Gen<Cleanup>> CLEANUP_DISTRIBUTION = 
Gens.enums().allMixedDistribution(Cleanup.class);
+
+    @Test
+    public void localOnlyTxnLifeCycle()
+    {
+        Ranges ranges = 
Ranges.ofSortedAndDeoverlapped(PrefixedIntHashKey.ranges(0, 1));
+        List<Node.Id> nodes = Collections.singletonList(new Node.Id(42));
+        Topology t = TopologyUtils.topology(1, nodes, ranges, 2);
+        qt().check(rs -> Cluster.run(rs::fork, nodes, t, nodeMap -> new 
Request()
+        {
+            @Override
+            public void preProcess(Node on, Node.Id from, ReplyContext 
replyContext)
+            {
+                // no-op
+            }
+
+            @Override
+            public void process(Node on, Node.Id from, ReplyContext 
replyContext)
+            {
+                Gen<Cleanup> cleanupGen = CLEANUP_DISTRIBUTION.next(rs);
+                for (int storeId : on.commandStores().ids())
+                {
+                    DelayedCommandStore store = (DelayedCommandStore) 
on.commandStores().forId(storeId);
+                    // this is a bit redudent but here to make the test easier 
to maintain.  Pre/Post execute we validate each command to make sure everything 
is fine
+                    // but that logic could be changed and this test has a 
dependency on validation the command... so to make this dependency explicit
+                    // the test will call the validation logic within the test 
even though it will be called again in the background...
+                    Consumer<Command> validate = store::validateRead;
+                    TxnId globalSyncId = 
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, Routable.Domain.Range);
+                    TxnId localSyncId = globalSyncId.as(Txn.Kind.LocalOnly);
+                    TxnId nextGlobalSyncId = 
on.nextTxnId(Txn.Kind.ExclusiveSyncPoint, 
Routable.Domain.Range).withEpoch(globalSyncId.epoch() + 1);
+                    Ranges ranges = 
AccordGens.rangesInsideRanges(store.updateRangesForEpoch().currentRanges(), 
(rs2, r) -> rs2.nextInt(1, 4)).next(rs);
+
+                    FullRoute<?> route = on.computeRoute(globalSyncId, ranges);
+                    SyncPoint<Ranges> syncPoint = new 
SyncPoint<>(globalSyncId, Deps.NONE, ranges, route);
+                    Ranges valid = AccordGens.rangesInsideRanges(ranges, (rs2, 
r) -> rs2.nextInt(1, 4)).next(rs);
+                    
Invariants.checkArgument(syncPoint.keysOrRanges.containsAll(valid));
+                    store.execute(contextFor(localSyncId, 
syncPoint.waitFor.keyDeps.keys(), KeyHistory.COMMANDS), safe -> 
Commands.createBootstrapCompleteMarkerTransaction(safe, localSyncId, syncPoint, 
valid))
+                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
+                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
Commands.markBootstrapComplete(safe, localSyncId, ranges)))
+                         .flatMap(ignore -> 
store.execute(contextFor(localSyncId), safe -> 
validate.accept(safe.get(localSyncId, route.homeKey()).current())))
+                         // cleanup txn
+                         .flatMap(ignore -> 
store.submit(PreLoadContext.empty(), safe -> {
+                             Cleanup target = cleanupGen.next(rs);
+                             if (target == Cleanup.NO)
+                                 return Cleanup.NO;
+                             
safe.commandStore().setRedundantBefore(RedundantBefore.create(ranges, 
Long.MIN_VALUE, Long.MAX_VALUE, nextGlobalSyncId, nextGlobalSyncId, 
TxnId.NONE));
+                             switch (target)
+                             {
+                                 case ERASE:
+                                     
safe.commandStore().setDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, nextGlobalSyncId));
+                                     break;
+                                 case TRUNCATE:
+                                     
safe.commandStore().setDurableBefore(DurableBefore.create(ranges, 
nextGlobalSyncId, globalSyncId));
+                                     break;
+                                 case TRUNCATE_WITH_OUTCOME:
+                                     // no update to DurableBefore = 
TRUNCATE_WITH_OUTCOME
+                                     break;
+                                 default:
+                                     throw new 
UnsupportedOperationException(target.name());
+                             }
+                             return target;
+                         }))
+                         // validate cmd
+                         .flatMap(target -> 
store.execute(contextFor(localSyncId), safe -> {
+                             SafeCommand cmd = safe.get(localSyncId, 
route.homeKey());
+                             Command current = cmd.current();
+                             validate.accept(current);
+                             
Assertions.assertThat(current.saveStatus()).isEqualTo(target == Cleanup.NO ? 
SaveStatus.Applied : target.appliesIfNot);
+                         }))
+                         .begin(on.agent());
+                }
+            }
+
+            @Override
+            public MessageType type()
+            {
+                return null;
+            }
+        }));
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/local/CommandsTest.java 
b/accord-core/src/test/java/accord/local/CommandsTest.java
index 6731a742..d8a00731 100644
--- a/accord-core/src/test/java/accord/local/CommandsTest.java
+++ b/accord-core/src/test/java/accord/local/CommandsTest.java
@@ -21,25 +21,15 @@ package accord.local;
 import accord.api.Key;
 import accord.api.RoutingKey;
 import accord.api.TestableConfigurationService;
-import accord.burn.random.FrequentLargeRange;
 import accord.coordinate.Preempted;
 import accord.coordinate.Timeout;
 import accord.coordinate.TopologyMismatch;
 import accord.impl.InMemoryCommandStore;
-import accord.impl.MessageListener;
 import accord.impl.PrefixedIntHashKey;
-import accord.impl.TopologyFactory;
 import accord.impl.basic.Cluster;
-import accord.impl.basic.Packet;
-import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.PropagatingPendingQueue;
-import accord.impl.basic.RandomDelayQueue;
-import accord.impl.basic.SimulatedDelayedExecutorService;
-import accord.impl.list.ListAgent;
 import accord.messages.MessageType;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
-import accord.primitives.Timestamp;
 import accord.topology.TopologyUtils;
 import accord.primitives.Keys;
 import accord.primitives.Range;
@@ -51,24 +41,13 @@ import accord.topology.Topology;
 import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
-import accord.utils.RandomSource;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -99,8 +78,12 @@ class CommandsTest
             Topology initialTopology = TopologyUtils.topology(1, nodes, 
Ranges.of(allRanges), rf);
             Topology updatedTopology = TopologyUtils.topology(2, nodes, 
Ranges.of(prefix0), rf); // drop prefix1
 
-            cluster(rs::fork, nodes, initialTopology, nodeMap -> new Request()
+            Cluster.run(rs::fork, nodes, initialTopology, nodeMap -> new 
Request()
             {
+                {
+                    // the node selected does not matter and should not impact 
determinism, they all share the same scheduler
+                    ((Cluster) 
nodeMap.values().stream().findFirst().get().scheduler()).onDone(() -> 
checkState(0, nodeMap.values()));
+                }
                 @Override
                 public void preProcess(Node on, Node.Id from, ReplyContext 
replyContext)
                 {
@@ -149,82 +132,6 @@ class CommandsTest
         });
     }
 
-    static void cluster(Supplier<RandomSource> randomSupplier, List<Node.Id> 
nodes, Topology initialTopology, Function<Map<Node.Id, Node>, Request> init)
-    {
-        List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
-        PropagatingPendingQueue queue = new PropagatingPendingQueue(failures, 
new RandomDelayQueue(randomSupplier.get()));
-        RandomSource retryRandom = randomSupplier.get();
-        Consumer<Runnable> retryBootstrap = retry -> {
-            long delay = retryRandom.nextInt(1, 15);
-            queue.add((PendingRunnable) retry::run, delay, TimeUnit.SECONDS);
-        };
-        Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = 
onStale -> new ListAgent(1000L, failures::add, retryBootstrap, onStale);
-        RandomSource nowRandom = randomSupplier.get();
-        Supplier<LongSupplier> nowSupplier = () -> {
-            RandomSource forked = nowRandom.fork();
-            // TODO (now): meta-randomise scale of clock drift
-            return FrequentLargeRange.builder(forked)
-                                     .ratio(1, 5)
-                                     .small(50, 5000, TimeUnit.MICROSECONDS)
-                                     .large(1, 10, TimeUnit.MILLISECONDS)
-                                     .build()
-                                     .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
-                                     .asLongSupplier(forked);
-        };
-        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(1000L, failures::add, 
retryBootstrap, (i1, i2) -> {
-            throw new IllegalAccessError("Global executor should enver get a 
stale event");
-        }));
-        TopologyFactory topologyFactory = new 
TopologyFactory(initialTopology.maxRf(), 
initialTopology.ranges().stream().toArray(Range[]::new))
-        {
-            @Override
-            public Topology toTopology(Node.Id[] cluster)
-            {
-                return initialTopology;
-            }
-        };
-        AtomicInteger counter = new AtomicInteger();
-        AtomicReference<Map<Node.Id, Node>> nodeMap = new AtomicReference<>();
-        Cluster.run(nodes.toArray(Node.Id[]::new),
-                    MessageListener.get(),
-                    () -> queue,
-                    (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
-                    queue::checkFailures,
-                    ignore -> {
-                    },
-                    randomSupplier,
-                    nowSupplier,
-                    topologyFactory,
-                    new Supplier<>()
-                    {
-                        private Iterator<Request> requestIterator = null;
-                        private final RandomSource rs = randomSupplier.get();
-                        @Override
-                        public Packet get()
-                        {
-                            // ((Cluster) node.scheduler()).onDone(() -> 
checkOnResult(homeKey, txnId, 0, null));
-                            if (requestIterator == null)
-                            {
-                                Map<Node.Id, Node> nodes = nodeMap.get();
-                                requestIterator = 
Collections.singleton(init.apply(nodes)).iterator();
-                                // the node selected does not matter and 
should not impact determinism, they all share the same scheduler
-                                ((Cluster) 
nodes.values().stream().findFirst().get().scheduler()).onDone(() -> 
checkState(0, nodes.values()));
-                            }
-                            if (!requestIterator.hasNext())
-                                return null;
-                            Node.Id id = rs.pick(nodes);
-                            return new Packet(id, id, 
counter.incrementAndGet(), requestIterator.next());
-                        }
-                    },
-                    Runnable::run,
-                    nodeMap::set);
-        if (!failures.isEmpty())
-        {
-            AssertionError error = new AssertionError("Unexpected errors 
detected");
-            failures.forEach(error::addSuppressed);
-            throw error;
-        }
-    }
-
     private static void checkState(int attempt, Collection<Node> values)
     {
         List<String> faults = new ArrayList<>();
diff --git a/accord-core/src/test/java/accord/utils/AccordGens.java 
b/accord-core/src/test/java/accord/utils/AccordGens.java
index 3247ebb1..ba326c3a 100644
--- a/accord-core/src/test/java/accord/utils/AccordGens.java
+++ b/accord-core/src/test/java/accord/utils/AccordGens.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.ToIntBiFunction;
 
 import javax.annotation.Nullable;
 
@@ -392,6 +393,28 @@ public class AccordGens
         throw new IllegalArgumentException("Unsupported type: " + 
range.start().getClass());
     }
 
+    public static Gen<Ranges> rangesInsideRanges(Ranges ranges, 
ToIntBiFunction<RandomSource, Range> numSplits)
+    {
+        if (ranges.isEmpty())
+            return ignore -> ranges;
+        List<Gen<Range>> subsets = new ArrayList<>(ranges.size());
+        ranges.forEach(r -> subsets.add(rangeInsideRange(r)));
+        return rs -> {
+            List<Range> result = new ArrayList<>(subsets.size());
+            for (int i = 0; i < subsets.size(); i++)
+            {
+                Range range = ranges.get(i);
+                int splits = numSplits.applyAsInt(rs, range);
+                if (splits < 0) throw new IllegalArgumentException("numSplits 
is less than 0: given " + splits);
+                if (splits == 0) continue;
+                Gen<Range> gen = subsets.get(i);
+                for (int s = 0; s < splits; s++)
+                    result.add(gen.next(rs));
+            }
+            return Ranges.of(result.toArray(Range[]::new));
+        };
+    }
+
     public static Gen<Range> prefixedIntHashKeyRangeInsideRange(Range range)
     {
         if (!(range.end() instanceof PrefixedIntHashKey))
diff --git a/accord-core/src/main/java/accord/messages/LocalRequest.java 
b/accord-core/src/test/java/accord/utils/LazyToString.java
similarity index 58%
copy from accord-core/src/main/java/accord/messages/LocalRequest.java
copy to accord-core/src/test/java/accord/utils/LazyToString.java
index d12e35a9..3496184a 100644
--- a/accord-core/src/main/java/accord/messages/LocalRequest.java
+++ b/accord-core/src/test/java/accord/utils/LazyToString.java
@@ -15,28 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package accord.messages;
 
-import accord.local.Node;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.utils.MapReduceConsume;
+package accord.utils;
 
-import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 
-public interface LocalRequest<R> extends Request, PreLoadContext, 
MapReduceConsume<SafeCommandStore, Void>
+public class LazyToString
 {
-    /**
-     * Process the request without executing the callback
-     */
-    void process(Node on);
+    private final Supplier<String> toString;
 
-    void process(Node on, BiConsumer<R, Throwable> callback);
-
-    BiConsumer<R, Throwable> callback();
+    public LazyToString(Supplier<String> toString)
+    {
+        this.toString = toString;
+    }
 
-    interface Handler
+    @Override
+    public String toString()
     {
-        void handle(LocalRequest<?> message, Node node);
+        return this.toString.get();
     }
 }
diff --git a/accord-core/src/test/java/accord/utils/ReflectionUtils.java 
b/accord-core/src/test/java/accord/utils/ReflectionUtils.java
new file mode 100644
index 00000000..701c911e
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/ReflectionUtils.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.Node;
+import accord.primitives.Timestamp;
+
+public class ReflectionUtils
+{
+    /**
+     * Types that should not walk the fields in the type.
+     */
+    private static final Set<Class<?>> NO_RECURSIVE = 
ImmutableSet.of(String.class,
+                                                                      // 
numbers store primitives but reflection converts to a boxed value; this will 
cause a recursive check over the same thing over and over again...
+                                                                      
Byte.class,
+                                                                      
Character.class,
+                                                                      
Short.class,
+                                                                      
Integer.class,
+                                                                      
Long.class,
+                                                                      
Float.class,
+                                                                      
Double.class,
+                                                                      // 
accord primitives
+                                                                      
Timestamp.class,
+                                                                      
Node.Id.class);
+
+    public static String toString(Object o)
+    {
+        if (o == null) return "null";
+        if (!o.getClass().isArray())
+            return o.toString();
+        if (o instanceof Object[])
+            return Stream.of((Object[]) 
o).map(ReflectionUtils::toString).collect(Collectors.toList()).toString();
+        if (o instanceof byte[])
+            return Arrays.toString((byte[]) o);
+        if (o instanceof short[])
+            return Arrays.toString((short[]) o);
+        if (o instanceof int[])
+            return Arrays.toString((int[]) o);
+        if (o instanceof long[])
+            return Arrays.toString((long[]) o);
+        if (o instanceof float[])
+            return Arrays.toString((float[]) o);
+        if (o instanceof double[])
+            return Arrays.toString((double[]) o);
+        if (o instanceof char[])
+            return Arrays.toString((char[]) o);
+        if (o instanceof boolean[])
+            return Arrays.toString((boolean[]) o);
+        throw new UnsupportedOperationException("Unknown array type: " + 
o.getClass());
+    }
+
+    public static List<Difference<?>> recursiveEquals(Object lhs, Object rhs)
+    {
+        if (Objects.equals(lhs, rhs)) return Collections.emptyList();
+        // there is a difference... find it...
+        List<Difference<?>> accum = new ArrayList<>();
+        DeterministicIdentitySet<Object> seenLhs = new 
DeterministicIdentitySet<>();
+        DeterministicIdentitySet<Object> seenRhs = new 
DeterministicIdentitySet<>();
+        recursiveEquals(".", seenLhs, lhs, seenRhs, rhs, accum);
+        return accum;
+    }
+
+    private static void recursiveEquals(String path, 
DeterministicIdentitySet<Object> seenLhs, Object lhs, 
DeterministicIdentitySet<Object> seenRhs, Object rhs, List<Difference<?>> accum)
+    {
+        if (Objects.equals(lhs, rhs)) return;
+        if (lhs == null || rhs == null)
+        {
+            // one side is null, so doesn't make sense to look further
+            accum.add(new Difference<>(path, lhs, rhs));
+            return;
+        }
+        if (!(seenLhs.add(lhs) && seenRhs.add(rhs)))
+        {
+            // seen pointer before, unsafe to keep walking
+            accum.add(new Difference<>(path, lhs, rhs));
+            return;
+        }
+        List<Field> fields = getFields(lhs.getClass());
+        if (fields.isEmpty())
+        {
+            accum.add(new Difference<>(path, lhs, rhs));
+            return;
+        }
+        int startSize = accum.size();
+        for (Field f : fields)
+        {
+            f.setAccessible(true);
+            try
+            {
+                recursiveEquals(path + f.getName() + '.', seenLhs, f.get(lhs), 
seenRhs, f.get(rhs), accum);
+            }
+            catch (IllegalAccessException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
+        // no field difference found, but the type defined equals as 
different, so add this level
+        if (startSize == accum.size())
+            accum.add(new Difference<>(path, lhs, rhs));
+    }
+    private static boolean checkFields(Class<?> klass)
+    {
+        return !(klass.isEnum() || NO_RECURSIVE.contains(klass));
+    }
+
+    private static List<Field> getFields(Class<?> aClass)
+    {
+        if (!checkFields(aClass))
+            return Collections.emptyList();
+        List<Field> fields = new ArrayList<>();
+        for (Class<?> klass = aClass; klass != null; klass = 
klass.getSuperclass())
+        {
+            Field[] fs = klass.getDeclaredFields();
+            if (fs != null && fs.length > 0)
+            {
+                for (Field f : fs)
+                {
+                    if (Modifier.isStatic(f.getModifiers()) || 
Modifier.isTransient(f.getModifiers())) continue;
+                    fields.add(f);
+                }
+            }
+        }
+        return fields;
+    }
+
+    public static class Difference<T>
+    {
+        public final String path;
+        public final T lhs, rhs;
+
+        public Difference(String path, T lhs, T rhs)
+        {
+            this.path = path;
+            this.lhs = lhs;
+            this.rhs = rhs;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Diff{" +
+                   "path='" + path + '\'' +
+                   ", lhs=" + ReflectionUtils.toString(lhs) +
+                   ", rhs=" + ReflectionUtils.toString(rhs) +
+                   '}';
+        }
+    }
+}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 1d9abff9..c4c34d78 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -321,7 +321,7 @@ public class Cluster implements Scheduler
                 MessageSink messageSink = sinks.create(node, 
randomSupplier.get());
                 LongSupplier nowSupplier = nowSupplierSupplier.get();
                 LocalConfig localConfig = new MutableLocalConfig();
-                lookup.put(node, new Node(node, messageSink, 
LocalRequest::process, new SimpleConfigService(topology),
+                lookup.put(node, new Node(node, messageSink, 
LocalRequest::simpleHandler, new SimpleConfigService(topology),
                                           nowSupplier, 
NodeTimeService.unixWrapper(TimeUnit.MICROSECONDS, nowSupplier),
                                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                                           MaelstromAgent.INSTANCE,
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index a7ac2ced..16a003a8 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -177,7 +177,7 @@ public class Main
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, 
init.self, out, err);
             LocalConfig localConfig = new MutableLocalConfig();
-            on = new Node(init.self, sink, LocalRequest::process, new 
SimpleConfigService(topology),
+            on = new Node(init.self, sink, LocalRequest::simpleHandler, new 
SimpleConfigService(topology),
                           System::currentTimeMillis, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, System::currentTimeMillis),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to