This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd0761c5 Expose Journal-related files to allow external Journal 
implementation integration.
bd0761c5 is described below

commit bd0761c567d153995a3db8da686ffdc940247200
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Nov 11 18:39:02 2024 +0100

    Expose Journal-related files to allow external Journal implementation 
integration.
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for 
CASSANDRA-20112.
---
 accord-core/src/main/java/accord/api/Journal.java  | 100 ++++
 .../src/main/java/accord/impl/AbstractLoader.java  |  74 +++
 .../java/accord/impl/InMemoryCommandStore.java     | 158 +++---
 .../main/java/accord/impl/TimestampsForKeys.java   |   7 +-
 .../src/main/java/accord/local/Command.java        |   8 +-
 .../src/main/java/accord/local/CommandStore.java   |   3 +
 accord-core/src/main/java/accord/local/Node.java   |   4 +-
 .../java/accord/messages/ReadEphemeralTxnData.java |   3 +-
 .../src/main/java/accord/primitives/Timestamp.java |   4 +-
 accord-core/src/test/java/accord/Utils.java        |  26 +-
 .../src/test/java/accord/burn/BurnTest.java        | 615 +--------------------
 .../burn/{BurnTest.java => BurnTestBase.java}      | 131 ++---
 .../src/test/java/accord/impl/IntHashKey.java      |   2 +-
 .../test/java/accord/impl/PrefixedIntHashKey.java  |   2 +-
 .../test/java/accord/impl/RemoteListenersTest.java |   7 +
 .../src/test/java/accord/impl/basic/Cluster.java   |  20 +-
 .../accord/impl/basic/DelayedCommandStores.java    |   7 +-
 .../basic/{Journal.java => InMemoryJournal.java}   | 335 ++++++-----
 .../java/accord/impl/basic/LoggingJournal.java     | 120 ++++
 .../java/accord/impl/basic/VerifyingJournal.java   |  97 ++++
 .../src/test/java/accord/impl/list/ListQuery.java  |   6 +-
 .../src/test/java/accord/impl/list/ListRead.java   |   2 +-
 .../src/test/java/accord/impl/list/ListResult.java |   2 +-
 .../java/accord/local/cfk/CommandsForKeyTest.java  |   6 +
 .../java/accord/topology/TopologyRandomizer.java   |   4 +-
 25 files changed, 759 insertions(+), 984 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
new file mode 100644
index 00000000..6bd18bd2
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import java.util.NavigableMap;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Persisted journal for transactional recovery.
+ */
+public interface Journal
+{
+    Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
+    // TODO (required): use OnDone instead of Runnable
+    void saveCommand(int store, CommandUpdate value, Runnable onFlush);
+
+    void purge(CommandStores commandStores);
+    void replay(CommandStores commandStores);
+
+    RedundantBefore loadRedundantBefore(int commandStoreId);
+    NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId);
+    NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
+    CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId);
+
+    void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
+
+    class CommandUpdate
+    {
+        public final TxnId txnId;
+        public final Command before;
+        public final Command after;
+
+        public CommandUpdate(@Nullable Command before, @Nonnull Command after)
+        {
+            this.txnId = after.txnId();
+            this.before = before;
+            this.after = after;
+        }
+    }
+
+    class FieldUpdates
+    {
+        // TODO (required): use persisted field logic
+        public RedundantBefore newRedundantBefore;
+        public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
+        public NavigableMap<Timestamp, Ranges> newSafeToRead;
+        public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch;
+
+        public String toString()
+        {
+            return "FieldUpdates{" +
+                   "newRedundantBefore=" + newRedundantBefore +
+                   ", newBootstrapBeganAt=" + newBootstrapBeganAt +
+                   ", newSafeToRead=" + newSafeToRead +
+                   ", newRangesForEpoch=" + newRangesForEpoch +
+                   '}';
+        }
+    }
+
+    /**
+     * Helper for CommandStore to restore Command states.
+     */
+    interface Loader
+    {
+        void load(Command next, OnDone onDone);
+        void apply(Command next, OnDone onDone);
+    }
+
+
+    interface OnDone
+    {
+        void success();
+        void failure(Throwable t);
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java 
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
new file mode 100644
index 00000000..66745a6d
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.function.BiConsumer;
+
+import accord.api.Journal;
+import accord.local.Cleanup;
+import accord.local.Command;
+import accord.local.Commands;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.primitives.TxnId;
+
+import static accord.primitives.SaveStatus.Applying;
+import static accord.primitives.Status.Invalidated;
+import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.Stable;
+import static accord.primitives.Status.Truncated;
+
+public abstract class AbstractLoader implements Journal.Loader
+{
+    protected Command loadInternal(Command command, SafeCommandStore safeStore)
+    {
+        TxnId txnId = command.txnId();
+        if (command.status() != Truncated && command.status() != Invalidated)
+        {
+            Cleanup cleanup = Cleanup.shouldCleanup(safeStore, command, 
command.participants());
+            switch (cleanup)
+            {
+                case NO:
+                    break;
+                case INVALIDATE:
+                case TRUNCATE_WITH_OUTCOME:
+                case TRUNCATE:
+                case ERASE:
+                    command = Commands.purge(command, command.participants(), 
cleanup);
+            }
+        }
+
+        return safeStore.unsafeGet(txnId).update(safeStore, command);
+    }
+
+    protected void applyWrites(Command command, SafeCommandStore safeStore, 
BiConsumer<SafeCommand, Command> apply)
+    {
+        TxnId txnId = command.txnId();
+        SafeCommand safeCommand = safeStore.unsafeGet(txnId);
+        Command local = safeCommand.current();
+        if (local.is(Stable) || local.is(PreApplied))
+        {
+            Commands.maybeExecute(safeStore, safeCommand, local, true, true);
+        }
+        else if (local.saveStatus().compareTo(Applying) >= 0 && 
!local.hasBeen(Truncated))
+        {
+            apply.accept(safeCommand, local);
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index cae30a16..9bf05302 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -43,18 +43,16 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import accord.api.LocalListeners;
-import accord.api.RoutingKey;
-import accord.impl.progresslog.DefaultProgressLog;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
+import accord.api.LocalListeners;
 import accord.api.ProgressLog;
-import accord.local.Cleanup;
+import accord.api.RoutingKey;
+import accord.impl.progresslog.DefaultProgressLog;
 import accord.local.Command;
 import accord.local.CommandStore;
 import accord.local.CommandStores.RangesForEpoch;
@@ -97,19 +95,17 @@ import static 
accord.local.SafeCommandStore.TestDep.ANY_DEPS;
 import static accord.local.SafeCommandStore.TestDep.WITH_OR_INVALIDATED;
 import static accord.local.SafeCommandStore.TestStartedAt.STARTED_BEFORE;
 import static accord.local.SafeCommandStore.TestStatus.ANY_STATUS;
+import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.SaveStatus.Applying;
 import static accord.primitives.SaveStatus.Erased;
 import static accord.primitives.SaveStatus.ErasedOrVestigial;
 import static accord.primitives.SaveStatus.ReadyToExecute;
 import static accord.primitives.Status.Applied;
 import static accord.primitives.Status.Durability.Local;
-import static accord.primitives.Status.Invalidated;
-import static accord.primitives.Status.PreApplied;
+import static accord.primitives.Status.NotDefined;
 import static accord.primitives.Status.PreCommitted;
 import static accord.primitives.Status.Stable;
 import static accord.primitives.Status.Truncated;
-import static accord.primitives.Status.NotDefined;
-import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.illegalState;
 import static java.lang.String.format;
 
@@ -130,10 +126,12 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     protected Timestamp maxRedundant = Timestamp.NONE;
 
     private InMemorySafeStore current;
+    private final Journal.Loader loader;
 
     public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
     {
         super(id, time, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
+        this.loader = new CommandLoader(this);
     }
 
     protected boolean canExposeUnloaded()
@@ -1377,99 +1375,89 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         unsafeSetRejectBefore(new RejectBefore());
     }
 
-    public interface Loader
+    public Journal.Loader loader()
     {
-        void load(Command next);
-        void apply(Command next);
+        return loader;
     }
 
-    public Loader loader()
+    private static class CommandLoader extends AbstractLoader
     {
-        return new Loader()
-        {
-            private PreLoadContext context(Command command, KeyHistory 
keyHistory)
-            {
-                TxnId txnId = command.txnId();
-                AbstractUnseekableKeys keys = null;
+        private final InMemoryCommandStore commandStore;
 
-                if (CommandsForKey.manages(txnId))
-                    keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
-                else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
-                    keys = command.asCommitted().waitingOn.keys;
+        private CommandLoader(InMemoryCommandStore commandStore)
+        {
+            this.commandStore = commandStore;
+        }
 
-                if (keys != null)
-                {
-                    return PreLoadContext.contextFor(txnId, keys, keyHistory);
-                }
+        private PreLoadContext context(Command command, KeyHistory keyHistory)
+        {
+            TxnId txnId = command.txnId();
+            AbstractUnseekableKeys keys = null;
 
-                return PreLoadContext.contextFor(txnId);
-            }
+            if (CommandsForKey.manages(txnId))
+                keys = (AbstractUnseekableKeys) 
command.participants().hasTouched();
+            else if (!CommandsForKey.managesExecution(txnId) && 
command.hasBeen(Status.Stable) && !command.hasBeen(Status.Truncated))
+                keys = command.asCommitted().waitingOn.keys;
 
-            public void load(Command command)
+            if (keys != null)
             {
-                TxnId txnId = command.txnId();
-
-                executeInContext(InMemoryCommandStore.this,
-                                 context(command, ASYNC),
-                                 safeStore -> {
-                                     Command local = command;
-                                     if (local.status() != Truncated && 
local.status() != Invalidated)
-                                     {
-                                         Cleanup cleanup = 
Cleanup.shouldCleanup(safeStore, local, local.participants());
-                                         switch (cleanup)
-                                         {
-                                             case NO:
-                                                 break;
-                                             case INVALIDATE:
-                                             case TRUNCATE_WITH_OUTCOME:
-                                             case TRUNCATE:
-                                             case ERASE:
-                                                 local = Commands.purge(local, 
local.participants(), cleanup);
-                                         }
-                                     }
-
-                                     local = 
safeStore.unsafeGet(txnId).update(safeStore, local);
-                                     if (local.status() == Truncated)
-                                         
safeStore.progressLog().clear(local.txnId());
-                                     return local;
-                                 });
+                return PreLoadContext.contextFor(txnId, keys, keyHistory);
+            }
 
+            return PreLoadContext.contextFor(txnId);
+        }
 
+        @Override
+        public void load(Command command, Journal.OnDone onDone)
+        {
+            try
+            {
+                commandStore.executeInContext(commandStore,
+                                              context(command, ASYNC),
+                                              safeStore -> 
loadInternal(command, safeStore));
             }
-
-            public void apply(Command command)
+            catch (Throwable t)
             {
-                TxnId txnId = command.txnId();
+                onDone.failure(t);
+            }
+
+            onDone.success();
+        }
 
+        @Override
+        public void apply(Command command, Journal.OnDone onDone)
+        {
+            try
+            {
                 PreLoadContext context = context(command, 
KeyHistory.TIMESTAMPS);
-                executeInContext(InMemoryCommandStore.this,
-                                 context,
-                                 safeStore -> {
-                                     SafeCommand safeCommand = 
safeStore.unsafeGet(txnId);
-                                     Command local = safeCommand.current();
-                                     if (local.is(Stable) || 
local.is(PreApplied))
-                                     {
-                                         Commands.maybeExecute(safeStore, 
safeCommand, local, true, true);
-                                     }
-                                     else if 
(local.saveStatus().compareTo(Applying) >= 0 && !local.hasBeen(Truncated))
-                                     {
-                                         unsafeApplyWrites(safeStore, 
safeCommand, local);
-                                     }
-                                     return null;
-                                 });
+                commandStore.executeInContext(commandStore,
+                                              context,
+                                              safeStore -> {
+                                                  applyWrites(command, 
safeStore, (safeCommand, cmd) -> {
+                                                      
unsafeApplyWrites(safeStore, safeCommand, cmd);
+                                                  });
+                                                  return null;
+                                              });
+            }
+            catch (Throwable t)
+            {
+                onDone.failure(t);
+                return;
             }
-        };
-    }
 
-    public static void unsafeApplyWrites(SafeCommandStore safeStore, 
SafeCommand safeCommand, Command command)
-    {
-        Command.Executed executed = command.asExecuted();
-        Participants<?> executes = executed.participants().executes(safeStore, 
command.txnId(), command.executeAt());
-        if (!executes.isEmpty())
+            onDone.success();
+        }
+
+        protected void unsafeApplyWrites(SafeCommandStore safeStore, 
SafeCommand safeCommand, Command command)
         {
-            command.writes().applyUnsafe(safeStore, 
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
-            safeCommand.applied(safeStore);
-            safeStore.notifyListeners(safeCommand, command);
+            Command.Executed executed = command.asExecuted();
+            Participants<?> executes = 
executed.participants().executes(safeStore, command.txnId(), 
command.executeAt());
+            if (!executes.isEmpty())
+            {
+                command.writes().applyUnsafe(safeStore, 
Commands.applyRanges(safeStore, command.executeAt()), command.partialTxn());
+                safeCommand.applied(safeStore);
+                safeStore.notifyListeners(safeCommand, command);
+            }
         }
     }
 
diff --git a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java 
b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
index d54080a6..f954f648 100644
--- a/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
+++ b/accord-core/src/main/java/accord/impl/TimestampsForKeys.java
@@ -21,6 +21,7 @@ package accord.impl;
 import accord.api.RoutingKey;
 import accord.api.VisibleForImplementation;
 import accord.local.SafeCommandStore;
+import accord.local.cfk.CommandsForKey;
 import accord.primitives.RoutingKeys;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
@@ -48,7 +49,8 @@ public class TimestampsForKeys
         {
             if 
(safeStore.redundantBefore().preBootstrapOrStale(TxnId.min(txnId, 
current.lastWriteId()), RoutingKeys.of(tfk.key().toUnseekable())) == FULLY)
                 return current;
-            throw illegalState("%s is less than the most recent write 
timestamp %s", executeAt, lastWrite);
+            if (CommandsForKey.reportLinearizabilityViolations())
+                throw illegalState("%s is less than the most recent write 
timestamp %s", executeAt, lastWrite);
         }
 
         Timestamp lastExecuted = current.lastExecutedTimestamp();
@@ -61,7 +63,8 @@ public class TimestampsForKeys
         {
             if 
(!safeStore.safeToReadAt(executeAt).contains(tfk.key().toUnseekable()))
                 return current;
-            throw illegalState("%s is less than the most recent executed 
timestamp %s", executeAt, lastExecuted);
+            if (CommandsForKey.reportLinearizabilityViolations())
+                throw illegalState("%s is less than the most recent executed 
timestamp %s", executeAt, lastExecuted);
         }
 
         long micros = executeAt.hlc();
diff --git a/accord-core/src/main/java/accord/local/Command.java 
b/accord-core/src/main/java/accord/local/Command.java
index cf64e6cd..a089036a 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -328,8 +328,8 @@ public abstract class Command implements CommonAttributes
                 {
                     default: throw new AssertionError("Unhandled Outcome: " + 
known.outcome);
                     case Apply:
-                        Invariants.checkState(writes != null, "Writes is 
null");
-                        Invariants.checkState(result != null, "Result is 
null");
+                        Invariants.checkState(writes != null, "Writes is null 
%s", validate);
+                        Invariants.checkState(result != null, "Result is null 
%s", validate);
                         break;
                     case Invalidated:
                         
Invariants.checkState(validate.durability().isMaybeInvalidated(), "%s is not 
invalidated", validate.durability());
@@ -337,8 +337,8 @@ public abstract class Command implements CommonAttributes
                         Invariants.checkState(validate.durability() != Local);
                     case Erased:
                     case WasApply:
-                        Invariants.checkState(writes == null, "Writes exist");
-                        Invariants.checkState(result == null, "Results exist");
+                        Invariants.checkState(writes == null, "Writes exist 
for %s", validate);
+                        Invariants.checkState(result == null, "Results exist 
%s", validate);
                         break;
                 }
             }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 6fb4ae04..f7eb397b 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -18,6 +18,7 @@
 
 package accord.local;
 
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.DataStore;
@@ -207,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor
         return id;
     }
 
+    public abstract Journal.Loader loader();
+
     @Override
     public Agent agent()
     {
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index a213c112..38939974 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -671,7 +671,9 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
      */
     public TxnId nextTxnId(Txn.Kind rw, Domain domain)
     {
-        return new TxnId(uniqueNow(), rw, domain);
+        TxnId txnId = new TxnId(uniqueNow(), rw, domain);
+        Invariants.checkState((txnId.lsb & (0xffff & ~TxnId.IDENTITY_FLAGS)) 
== 0);
+        return txnId;
     }
 
     public AsyncResult<Result> coordinate(Txn txn)
diff --git 
a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java 
b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
index e86a1805..1ee926df 100644
--- a/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
+++ b/accord-core/src/main/java/accord/messages/ReadEphemeralTxnData.java
@@ -75,7 +75,8 @@ public class ReadEphemeralTxnData extends ReadData
     private ReadEphemeralTxnData(TxnId txnId, Participants<?> readScope, 
Route<?> scope, long executeAtEpoch, @Nonnull Txn txn, @Nonnull Deps deps, 
@Nonnull FullRoute<?> route)
     {
         super(txnId, readScope.intersecting(scope), executeAtEpoch);
-        Invariants.checkState(executeAtEpoch == txnId.epoch());
+        Invariants.checkState(executeAtEpoch == txnId.epoch(),
+                              "Epoch for transaction %s (%d) did not match 
expected %d", txn, txnId.epoch(), executeAtEpoch);
         this.route = route;
         this.partialTxn = txn.intersecting(scope, false);
         this.partialDeps = deps.intersecting(scope);
diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java 
b/accord-core/src/main/java/accord/primitives/Timestamp.java
index 0a9de552..f488ab8b 100644
--- a/accord-core/src/main/java/accord/primitives/Timestamp.java
+++ b/accord-core/src/main/java/accord/primitives/Timestamp.java
@@ -38,8 +38,8 @@ public class Timestamp implements Comparable<Timestamp>, 
EpochSupplier
      */
     private static final int MERGE_FLAGS = 0x8000;
     // TODO (testing): is this the correct set of identity bits?
-    private static final long IDENTITY_LSB = 0xFFFFFFFFFFFF001EL;
-    public static final int IDENTITY_FLAGS = 0x001E;
+    private static final long IDENTITY_LSB = 0xFFFFFFFF_FFFF001FL;
+    public static final int IDENTITY_FLAGS = 0x00000000_0000001F;
     public static final long MAX_EPOCH = (1L << 48) - 1;
     private static final long HLC_INCR = 1L << 16;
     static final long MAX_FLAGS = HLC_INCR - 1;
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 9ea7d858..b2b04cae 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -23,25 +23,25 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import accord.api.Agent;
 import com.google.common.collect.Sets;
 
+import accord.api.Agent;
 import accord.api.Key;
+import accord.api.LocalConfig;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
-import accord.api.LocalConfig;
 import accord.coordinate.CoordinationAdapter;
+import accord.impl.DefaultLocalListeners;
+import accord.impl.DefaultRemoteListeners;
 import accord.impl.DefaultTimeouts;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
-import accord.impl.DefaultLocalListeners;
-import accord.impl.progresslog.DefaultProgressLogs;
-import accord.impl.DefaultRemoteListeners;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TestAgent;
 import accord.impl.list.ListQuery;
@@ -50,6 +50,7 @@ import accord.impl.list.ListUpdate;
 import accord.impl.mock.MockCluster;
 import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
+import accord.impl.progresslog.DefaultProgressLogs;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -132,9 +133,10 @@ public class Utils
 
     public static Txn listWriteTxn(Id client, Keys keys)
     {
-        ListUpdate update = new ListUpdate(Function.identity());
+        TreeMap<Key, Integer> map = new TreeMap<>();
         for (Key k : keys)
-            update.put(k, 1);
+            map.put(k, 1);
+        ListUpdate update = new ListUpdate(Function.identity());
         ListRead read = new ListRead(Function.identity(), false, keys, keys);
         ListQuery query = new ListQuery(client, keys.size(), false);
         return new Txn.InMemory(keys, read, query, update);
@@ -202,6 +204,11 @@ public class Utils
         return node;
     }
 
+    public static TopologyManager testTopologyManager(TopologySorter.Supplier 
sorter, Id node)
+    {
+        return new TopologyManager(sorter, new TestAgent.RethrowAgent(), node, 
Scheduler.NEVER_RUN_SCHEDULED, new MockCluster.Clock(0), LocalConfig.DEFAULT);
+    }
+
     public static void spinUntilSuccess(ThrowingRunnable runnable)
     {
         spinUntilSuccess(runnable, 10);
@@ -216,9 +223,4 @@ public class Utils
                   .ignoreExceptions()
                   .untilAsserted(runnable);
     }
-
-    public static TopologyManager testTopologyManager(TopologySorter.Supplier 
sorter, Id node)
-    {
-        return new TopologyManager(sorter, new TestAgent.RethrowAgent(), node, 
Scheduler.NEVER_RUN_SCHEDULED, new MockCluster.Clock(0), LocalConfig.DEFAULT);
-    }
 }
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTest.java
index 69a7b683..1d3b2646 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -18,514 +18,21 @@
 
 package accord.burn;
 
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.IntSupplier;
 import java.util.function.LongSupplier;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-import java.util.zip.CRC32;
 
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import accord.api.Key;
-import accord.burn.random.FrequentLargeRange;
-import accord.impl.MessageListener;
-import accord.impl.PrefixedIntHashKey;
-import accord.impl.TopologyFactory;
-import accord.impl.basic.Cluster;
-import accord.impl.basic.Cluster.Stats;
-import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
-import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.MonitoredPendingQueue;
-import accord.impl.basic.RandomDelayQueue;
-import accord.impl.basic.RandomDelayQueue.Factory;
-import accord.impl.basic.SimulatedDelayedExecutorService;
-import accord.impl.list.ListAgent;
-import accord.impl.list.ListQuery;
-import accord.impl.list.ListRead;
-import accord.impl.list.ListRequest;
-import accord.impl.list.ListResult;
-import accord.impl.list.ListUpdate;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.MessageType;
-import accord.messages.Reply;
-import accord.primitives.Keys;
-import accord.primitives.Range;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.topology.Shard;
-import accord.topology.Topology;
 import accord.utils.DefaultRandom;
-import accord.utils.Gen;
-import accord.utils.Gens;
-import accord.utils.RandomSource;
-import accord.utils.Utils;
-import accord.utils.async.AsyncExecutor;
-import accord.utils.async.TimeoutUtils;
-import accord.verify.CompositeVerifier;
-import accord.verify.ElleVerifier;
-import accord.verify.StrictSerializabilityVerifier;
-import accord.verify.Verifier;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntHashSet;
 
-import static accord.impl.PrefixedIntHashKey.forHash;
-import static accord.impl.PrefixedIntHashKey.range;
-import static accord.impl.PrefixedIntHashKey.ranges;
-import static accord.primitives.Txn.Kind.EphemeralRead;
 import static accord.utils.Invariants.illegalArgument;
-import static accord.utils.Utils.toArray;
 
-public class BurnTest
+public class BurnTest extends BurnTestBase
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(BurnTest.class);
-
-    /**
-     * Min hash value for the test domain, this value must be respected by the 
hash function
-     * @see {@link BurnTest#hash(int)}
-     */
-    public static final int HASH_RANGE_START = 0;
-    /**
-     * Max hash value for the test domain, this value must be respected by the 
hash function
-     * @see {@link BurnTest#hash(int)}
-     */
-    public static final int HASH_RANGE_END = 1 << 16;
-    private static final Range[] EMPTY_RANGES = new Range[0];
-
-    static List<Packet> generate(RandomSource random, MessageListener 
listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id> 
clients, List<Id> nodes, int[] keys, int operations)
-    {
-        List<Packet> packets = new ArrayList<>();
-        Int2ObjectHashMap<int[]> prefixKeyUpdates = new Int2ObjectHashMap<>();
-        double readInCommandStore = random.nextDouble();
-        Function<int[], Range> nextRange = randomRanges(random);
-
-        for (int count = 0 ; count < operations ; ++count)
-        {
-            int finalCount = count;
-            Id client = clients.get(random.nextInt(clients.size()));
-            Id node = nodes.get(random.nextInt(nodes.size()));
-
-            boolean isRangeQuery = random.nextBoolean();
-            String description;
-            Function<Node, Txn> txnGenerator;
-            if (isRangeQuery)
-            {
-                description = "range";
-                txnGenerator = n -> {
-                    int[] prefixes = prefixes(n.topology().current());
-
-                    int rangeCount = 1 + random.nextInt(2);
-                    List<Range> requestRanges = new ArrayList<>();
-                    while (--rangeCount >= 0)
-                        requestRanges.add(nextRange.apply(prefixes));
-                    Ranges ranges = 
Ranges.of(requestRanges.toArray(EMPTY_RANGES));
-                    ListRead read = new 
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, 
false, ranges, ranges);
-                    ListQuery query = new ListQuery(client, finalCount, false);
-                    return new Txn.InMemory(ranges, read, query);
-                };
-            }
-            else
-            {
-                description = "key";
-                txnGenerator = n -> {
-                    int[] prefixes = prefixes(n.topology().current());
-
-                    boolean isWrite = random.nextBoolean();
-                    int readCount = 1 + random.nextInt(2);
-                    int writeCount = isWrite ? 1 + random.nextInt(2) : 0;
-                    Txn.Kind kind = isWrite ? Txn.Kind.Write : readCount == 1 
? EphemeralRead : Txn.Kind.Read;
-
-                    TreeSet<Key> requestKeys = new TreeSet<>();
-                    IntHashSet readValues = new IntHashSet();
-                    while (readCount-- > 0)
-                        requestKeys.add(randomKey(random, prefixes, keys, 
readValues));
-
-                    ListUpdate update = isWrite ? new ListUpdate(executor) : 
null;
-                    IntHashSet writeValues = isWrite ? new IntHashSet() : null;
-                    while (writeCount-- > 0)
-                    {
-                        PrefixedIntHashKey.Key key = randomKey(random, 
prefixes, keys, writeValues);
-                        int i = Arrays.binarySearch(keys, key.key);
-                        int[] keyUpdateCounter = 
prefixKeyUpdates.computeIfAbsent(key.prefix, ignore -> new int[keys.length]);
-                        update.put(key, ++keyUpdateCounter[i]);
-                    }
-
-                    Keys readKeys = new Keys(requestKeys);
-                    if (isWrite)
-                        requestKeys.addAll(update.keySet());
-                    ListRead read = new 
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor, 
kind == EphemeralRead, readKeys, new Keys(requestKeys));
-                    ListQuery query = new ListQuery(client, finalCount, kind 
== EphemeralRead);
-                    return new Txn.InMemory(kind, new Keys(requestKeys), read, 
query, update);
-                };
-            }
-            packets.add(new Packet(client, node, Long.MAX_VALUE, count, new 
ListRequest(description, txnGenerator, listener)));
-        }
-
-        return packets;
-    }
-
-    private static int[] prefixes(Topology topology)
-    {
-        IntHashSet uniq = new IntHashSet();
-        for (Shard shard : topology.shards())
-            uniq.add(((PrefixedIntHashKey) shard.range.start()).prefix);
-        int[] prefixes = new int[uniq.size()];
-        IntHashSet.IntIterator it = uniq.iterator();
-        for (int i = 0; it.hasNext(); i++)
-            prefixes[i] = it.nextValue();
-        Arrays.sort(prefixes);
-        return prefixes;
-    }
-
-    private static Function<int[], Range> randomRanges(RandomSource rs)
-    {
-        int selection = rs.nextInt(0, 2);
-        switch (selection)
-        {
-            case 0: // uniform
-                return (prefixes) -> randomRange(rs, prefixes, () -> 
rs.nextInt(0, 1 << 13) + 1);
-            case 1: // zipf
-                int domain = HASH_RANGE_END - HASH_RANGE_START + 1;
-                int splitSize = 100;
-                int interval = domain / splitSize;
-                int[] splits = new int[6];
-                for (int i = 0; i < splits.length; i++)
-                    splits[i] = i == 0 ? interval : splits[i - 1] * 2;
-                int[] splitsToPick = splits;
-                int bias = rs.nextInt(0, 3); // small, large, random
-                if (bias != 0)
-                    splitsToPick = Arrays.copyOf(splits, splits.length);
-                if (bias == 1)
-                    Utils.reverse(splitsToPick);
-                else if (bias == 2)
-                    Utils.shuffle(splitsToPick, rs);
-                Gen.IntGen zipf = Gens.pickZipf(splitsToPick);
-                return (prefixes) -> randomRange(rs, prefixes, () -> {
-                    int value = zipf.nextInt(rs);
-                    int idx = Arrays.binarySearch(splits, value);
-                    int min = idx == 0 ? 0 : splits[idx - 1];
-                    return rs.nextInt(min, value) + 1;
-                });
-            default:
-                throw new AssertionError("Unexpected value: " + selection);
-        }
-    }
-
-    private static Range randomRange(RandomSource random, int[] prefixes, 
IntSupplier rangeSizeFn)
-    {
-        int prefix = random.pickInt(prefixes);
-        int i = random.nextInt(HASH_RANGE_START, HASH_RANGE_END);
-        int rangeSize = rangeSizeFn.getAsInt();
-        int j = i + rangeSize;
-        if (j > HASH_RANGE_END)
-        {
-            int delta = j - HASH_RANGE_END;
-            j = HASH_RANGE_END;
-            i -= delta;
-            // saftey check, this shouldn't happen unless the configs were 
changed in an unsafe way
-            if (i < HASH_RANGE_START)
-                i = HASH_RANGE_START;
-        }
-        return range(forHash(prefix, i), forHash(prefix, j));
-    }
-
-    private static PrefixedIntHashKey.Key randomKey(RandomSource random, int[] 
prefixes, int[] keys, Set<Integer> seen)
-    {
-        int prefix = random.pickInt(prefixes);
-        int key;
-        do
-        {
-            key = random.pickInt(keys);
-        }
-        while (!seen.add(key));
-        return PrefixedIntHashKey.key(prefix, key, hash(key));
-    }
-
-    /**
-     * This class uses a limited range than the default for the following 
reasons:
-     *
-     * 1) easier to debug smaller numbers
-     * 2) adds hash collisions (multiple keys map to the same hash)
-     */
-    private static int hash(int key)
-    {
-        CRC32 crc32c = new CRC32();
-        crc32c.update(key);
-        crc32c.update(key >> 8);
-        crc32c.update(key >> 16);
-        crc32c.update(key >> 24);
-        return (int) crc32c.getValue() & 0xffff;
-    }
-
-    @SuppressWarnings("unused")
-    static void reconcile(long seed, TopologyFactory topologyFactory, List<Id> 
clients, List<Id> nodes, int keyCount, int prefixCount, int operations, int 
concurrency) throws ExecutionException, InterruptedException
-    {
-        RandomSource random1 = new DefaultRandom(), random2 = new 
DefaultRandom();
-
-        random1.setSeed(seed);
-        random2.setSeed(seed);
-        ExecutorService exec = Executors.newFixedThreadPool(2);
-        RandomDelayQueue.ReconcilingQueueFactory factory = new 
RandomDelayQueue.ReconcilingQueueFactory(seed);
-        Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(true)));
-        Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(false)));
-        exec.shutdown();
-        f1.get();
-        f2.get();
-    }
-
-    static void burn(RandomSource random, TopologyFactory topologyFactory, 
List<Id> clients, List<Id> nodes, int keyCount, int prefixCount, int 
operations, int concurrency, PendingQueue pendingQueue)
-    {
-        List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
-        AtomicLong progress = new AtomicLong();
-        MonitoredPendingQueue queue = new MonitoredPendingQueue(failures, 
progress, 5L, TimeUnit.MINUTES, pendingQueue);
-        long startNanos = System.nanoTime();
-        long startLogicalMillis = queue.nowInMillis();
-        Consumer<Runnable> retryBootstrap;
-        {
-            RandomSource retryRandom = random.fork();
-            retryBootstrap = retry -> {
-                long delay = retryRandom.nextInt(1, 15);
-                queue.add(PendingRunnable.create(retry::run), delay, 
TimeUnit.SECONDS);
-            };
-        }
-        IntSupplier coordinationDelays, progressDelays, timeoutDelays;
-        {
-            RandomSource rnd = random.fork();
-            coordinationDelays = delayGenerator(rnd, 1, 100, 100, 1000);
-            progressDelays = delayGenerator(rnd, 1, 100, 100, 1000);
-            timeoutDelays = delayGenerator(rnd, 500, 800, 1000, 10000);
-        }
-        Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier = 
onStale -> new ListAgent(random.fork(), 1000L, failures::add, retryBootstrap, 
onStale, coordinationDelays, progressDelays, timeoutDelays, 
pendingQueue::nowInMillis);
-
-        Supplier<LongSupplier> nowSupplier = () -> {
-            RandomSource forked = random.fork();
-            // TODO (expected): meta-randomise scale of clock drift
-            return FrequentLargeRange.builder(forked)
-                                                   .ratio(1, 5)
-                                                   .small(50, 5000, 
TimeUnit.MICROSECONDS)
-                                                   .large(1, 10, 
TimeUnit.MILLISECONDS)
-                                                   .build()
-                                                   .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
-                    .asLongSupplier(forked);
-        };
-
-        SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L, 
failures::add, retryBootstrap, (i1, i2) -> {
-            throw new IllegalAccessError("Global executor should enver get a 
stale event");
-        }, coordinationDelays, progressDelays, timeoutDelays, 
queue::nowInMillis));
-        Verifier verifier = createVerifier(keyCount * prefixCount);
-        Function<CommandStore, AsyncExecutor> executor = ignore -> 
globalExecutor;
-
-        MessageListener listener = MessageListener.get();
-
-        int[] prefixes = IntStream.concat(IntStream.of(0), 
IntStream.generate(() -> 
random.nextInt(1024))).distinct().limit(prefixCount).toArray();
-        int[] newPrefixes = Arrays.copyOfRange(prefixes, 1, prefixes.length);
-        Arrays.sort(prefixes);
-
-        int[] keys = IntStream.range(0, keyCount).toArray();
-        Packet[] requests = toArray(generate(random, listener, executor, 
clients, nodes, keys, operations), Packet[]::new);
-        int[] starts = new int[requests.length];
-        Packet[] replies = new Packet[requests.length];
-
-        AtomicInteger acks = new AtomicInteger();
-        AtomicInteger nacks = new AtomicInteger();
-        AtomicInteger lost = new AtomicInteger();
-        AtomicInteger truncated = new AtomicInteger();
-        AtomicInteger recovered = new AtomicInteger();
-        AtomicInteger failedToCheck = new AtomicInteger();
-        AtomicInteger clock = new AtomicInteger();
-        AtomicInteger requestIndex = new AtomicInteger();
-        Queue<Packet> initialRequests = new ArrayDeque<>();
-        for (int max = Math.min(concurrency, requests.length) ; 
requestIndex.get() < max ; )
-        {
-            int i = requestIndex.getAndIncrement();
-            starts[i] = clock.incrementAndGet();
-            initialRequests.add(requests[i]);
-        }
-
-        // not used for atomicity, just for encapsulation
-        AtomicReference<Runnable> onSubmitted = new AtomicReference<>();
-        Consumer<Packet> responseSink = packet -> {
-            if (replies[(int)packet.replyId] != null)
-                return;
-
-            if (requestIndex.get() < requests.length)
-            {
-                int i = requestIndex.getAndIncrement();
-                starts[i] = clock.incrementAndGet();
-                queue.addNoDelay(requests[i]);
-                if (i == requests.length - 1)
-                    onSubmitted.get().run();
-            }
-            if (packet.message instanceof Reply.FailureReply)
-            {
-                failures.add(new AssertionError("Unexpected failure in list 
reply", ((Reply.FailureReply) packet.message).failure));
-                return;
-            }
-            ListResult reply = (ListResult) packet.message;
-
-            try
-            {
-                if (!reply.isSuccess() && reply.status() == 
ListResult.Status.HeartBeat)
-                    return; // interrupted; will fetch our actual reply once 
rest of simulation is finished (but wanted to send another request to keep 
correct number in flight)
-
-                int start = starts[(int)packet.replyId];
-                int end = clock.incrementAndGet();
-                logger.debug("{} at [{}, {}]", reply, start, end);
-                replies[(int)packet.replyId] = packet;
-
-                if (!reply.isSuccess())
-                {
-                    switch (reply.status())
-                    {
-                        case Lost:          lost.incrementAndGet();            
 break;
-                        case Invalidated:   nacks.incrementAndGet();           
 break;
-                        case Failure:       failedToCheck.incrementAndGet();   
 break;
-                        case Truncated:     truncated.incrementAndGet();       
 break;
-                        // txn was applied?, but client saw a timeout, so 
response isn't known
-                        case Other:                                            
 break;
-                        default:            throw new 
AssertionError("Unexpected fault: " + reply.status());
-                    }
-                    return;
-                }
-
-                progress.incrementAndGet();
-                switch (reply.status())
-                {
-                    default: throw new AssertionError("Unhandled status: " + 
reply.status());
-                    case Applied: acks.incrementAndGet(); break;
-                    case RecoveryApplied: recovered.incrementAndGet(); // 
NOTE: technically this might have been applied by the coordinator and it simply 
timed out
-                }
-                // TODO (correctness): when a keyspace is removed, the 
history/validator isn't cleaned up...
-                // the current logic for add keyspace only knows what is 
there, so a ABA problem exists where keyspaces
-                // may come back... logically this is a problem as the history 
doesn't get reset, but practically that
-                // is fine as the backing map and the validator are consistent
-                Verifier.Checker check = verifier.witness(start, end);
-                for (int i = 0 ; i < reply.read.length ; ++i)
-                {
-                    Key key = reply.responseKeys.get(i);
-                    int prefix = prefix(key);
-                    int keyValue = key(key);
-                    int k = Arrays.binarySearch(keys, keyValue);
-
-                    int[] read = reply.read[i];
-                    int write = reply.update == null ? -1 : 
reply.update.getOrDefault(key, -1);
-
-                    int prefixIndex = Arrays.binarySearch(prefixes, prefix);
-                    int index = keyCount * prefixIndex + k;
-                    if (read != null)
-                        check.read(index, read);
-                    if (write >= 0)
-                        check.write(index, write);
-                }
-                check.close();
-            }
-            catch (Throwable t)
-            {
-                failures.add(t);
-            }
-        };
-
-        Map<MessageType, Stats> messageStatsMap;
-        try
-        {
-            messageStatsMap = Cluster.run(toArray(nodes, Id[]::new), 
newPrefixes, listener, () -> queue,
-                                          (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
-                                          queue::checkFailures,
-                                          responseSink, random::fork, 
nowSupplier,
-                                          topologyFactory, 
initialRequests::poll,
-                                          onSubmitted::set,
-                                          ignore -> {}
-            );
-            verifier.close();
-        }
-        catch (Throwable t)
-        {
-            logger.info("Keys: " + Arrays.toString(keys));
-            logger.info("Prefixes: " + Arrays.toString(prefixes));
-            for (int i = 0 ; i < requests.length ; ++i)
-            {
-                logger.info("{}", requests[i]);
-                logger.info("\t\t" + replies[i]);
-            }
-            throw t;
-        }
-
-        int observedOperations = acks.get() + recovered.get() + nacks.get() + 
lost.get() + truncated.get();
-        logger.info("nodes: {}, rf: {}. Received {} acks, {} recovered, {} 
nacks, {} lost, {} truncated ({} total) to {} operations", nodes.size(), 
topologyFactory.rf, acks.get(), recovered.get(), nacks.get(), lost.get(), 
truncated.get(), observedOperations, operations);
-        logger.info("Message counts: {}", statsInDescOrder(messageStatsMap));
-        logger.info("Took {} and in logical time of {}", 
Duration.ofNanos(System.nanoTime() - startNanos), 
Duration.ofMillis(queue.nowInMillis() - startLogicalMillis));
-        if (clock.get() != operations * 2 || observedOperations != operations)
-        {
-            StringBuilder sb = new StringBuilder();
-            for (int i = 0 ; i < requests.length ; ++i)
-            {
-                // since this only happens when operations are lost, only log 
the ones without a reply to lower the amount of noise
-                if (replies[i] == null)
-                {
-                    sb.setLength(0);
-                    sb.append(requests[i]).append("\n\t\t").append(replies[i]);
-                    logger.info(sb.toString());
-                }
-            }
-            if (clock.get() != operations * 2) throw new 
AssertionError("Incomplete set of responses; clock=" + clock.get() + ", 
expected operations=" + (operations * 2));
-            else throw new AssertionError("Incomplete set of responses; 
ack+recovered+other+nacks+lost+truncated=" + observedOperations + ", expected 
operations=" + (operations * 2));
-        }
-    }
-
-    private static IntSupplier delayGenerator(RandomSource rnd, int 
absoluteMin, int absoluteMaxMin, int absoluteMinMax, int asoluteMax)
-    {
-        int minDelay = rnd.nextInt(absoluteMin, absoluteMaxMin);
-        int maxDelay = rnd.nextInt(Math.max(absoluteMinMax, minDelay), 
asoluteMax);
-        if (rnd.nextBoolean())
-        {
-            int medianDelay = rnd.nextInt(minDelay, maxDelay);
-            return () -> rnd.nextBiasedInt(minDelay, medianDelay, maxDelay);
-        }
-        return () -> rnd.nextInt(minDelay, maxDelay);
-    }
-
-    private static String statsInDescOrder(Map<MessageType, Stats> statsMap)
-    {
-        List<Stats> stats = new ArrayList<>(statsMap.values());
-        stats.sort(Comparator.comparingInt(s -> -s.count()));
-        return stats.toString();
-    }
-
-    private static Verifier createVerifier(int keyCount)
+    @Test
+    public void testOne()
     {
-        if (!ElleVerifier.Support.allowed())
-            return new StrictSerializabilityVerifier("", keyCount);
-        return CompositeVerifier.create(new StrictSerializabilityVerifier("", 
keyCount),
-                                        new ElleVerifier());
+        run(System.nanoTime());
     }
 
     public static void main(String[] args)
@@ -575,118 +82,4 @@ public class BurnTest
             }
         }
     }
-
-    @Test
-    public void testOne()
-    {
-        run(System.nanoTime());
-    }
-
-    private static void run(long seed)
-    {
-        Duration timeout = Duration.ofMinutes(3);
-        try
-        {
-            TimeoutUtils.runBlocking(timeout, "BurnTest with timeout", () -> 
run(seed, 1000));
-        }
-        catch (Throwable thrown)
-        {
-            Throwable cause = thrown;
-            if (cause instanceof ExecutionException)
-                cause = cause.getCause();
-            if (cause instanceof TimeoutException)
-            {
-                TimeoutException override = new TimeoutException("test did not 
complete within " + timeout);
-                override.setStackTrace(new StackTraceElement[0]);
-                cause = override;
-            }
-            logger.error("Exception running burn test for seed {}:", seed, 
cause);
-            throw SimulationException.wrap(seed, cause);
-        }
-    }
-
-    private static void run(long seed, int operations)
-    {
-        logger.info("Seed: {}", seed);
-        Cluster.trace.trace("Seed: {}", seed);
-        RandomSource random = new DefaultRandom(seed);
-        try
-        {
-            List<Id> clients = generateIds(true, 1 + random.nextInt(4));
-            int rf;
-            float chance = random.nextFloat();
-            if (chance < 0.2f)      { rf = random.nextInt(2, 9); }
-            else if (chance < 0.4f) { rf = 3; }
-            else if (chance < 0.7f) { rf = 5; }
-            else if (chance < 0.8f) { rf = 7; }
-            else                    { rf = 9; }
-
-            List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
-
-            burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, 
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 
3))),
-                    clients,
-                    nodes,
-                    5 + random.nextInt(15),
-                    5 + random.nextInt(15),
-                    operations,
-                    10 + random.nextInt(30),
-                 new Factory(random).get());
-        }
-        catch (Throwable t)
-        {
-            logger.error("Exception running burn test for seed {}:", seed, t);
-            throw SimulationException.wrap(seed, t);
-        }
-    }
-
-    private static void reconcile(long seed, int operations)
-    {
-        logger.info("Seed: {}", seed);
-        Cluster.trace.trace("Seed: {}", seed);
-        RandomSource random = new DefaultRandom(seed);
-        try
-        {
-            List<Id> clients = generateIds(true, 1 + random.nextInt(4));
-            int rf;
-            float chance = random.nextFloat();
-            if (chance < 0.2f)      { rf = random.nextInt(2, 9); }
-            else if (chance < 0.4f) { rf = 3; }
-            else if (chance < 0.7f) { rf = 5; }
-            else if (chance < 0.8f) { rf = 7; }
-            else                    { rf = 9; }
-
-            List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
-
-            reconcile(seed, new TopologyFactory(rf, ranges(0, 
HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, 
rf), nodes.size() * 3))),
-                    clients,
-                    nodes,
-                    5 + random.nextInt(15),
-                    5 + random.nextInt(15),
-                    operations,
-                    10 + random.nextInt(30));
-        }
-        catch (Throwable t)
-        {
-            logger.error("Exception running burn test for seed {}:", seed, t);
-            throw SimulationException.wrap(seed, t);
-        }
-    }
-
-    private static List<Id> generateIds(boolean clients, int count)
-    {
-        List<Id> ids = new ArrayList<>();
-        for (int i = 1; i <= count ; ++i)
-            ids.add(new Id(clients ? -i : i));
-        return ids;
-    }
-
-    private static int key(Key key)
-    {
-        return ((PrefixedIntHashKey) key).key;
-    }
-
-    private static int prefix(Key key)
-    {
-        return ((PrefixedIntHashKey) key).prefix;
-    }
 }
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java 
b/accord-core/src/test/java/accord/burn/BurnTestBase.java
similarity index 87%
copy from accord-core/src/test/java/accord/burn/BurnTest.java
copy to accord-core/src/test/java/accord/burn/BurnTestBase.java
index 69a7b683..dd6c14bf 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestBase.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,10 +47,10 @@ import java.util.function.Supplier;
 import java.util.stream.IntStream;
 import java.util.zip.CRC32;
 
-import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Journal;
 import accord.api.Key;
 import accord.burn.random.FrequentLargeRange;
 import accord.impl.MessageListener;
@@ -59,10 +58,11 @@ import accord.impl.PrefixedIntHashKey;
 import accord.impl.TopologyFactory;
 import accord.impl.basic.Cluster;
 import accord.impl.basic.Cluster.Stats;
+import accord.impl.basic.InMemoryJournal;
+import accord.impl.basic.MonitoredPendingQueue;
 import accord.impl.basic.Packet;
 import accord.impl.basic.PendingQueue;
 import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.MonitoredPendingQueue;
 import accord.impl.basic.RandomDelayQueue;
 import accord.impl.basic.RandomDelayQueue.Factory;
 import accord.impl.basic.SimulatedDelayedExecutorService;
@@ -91,8 +91,6 @@ import accord.utils.RandomSource;
 import accord.utils.Utils;
 import accord.utils.async.AsyncExecutor;
 import accord.utils.async.TimeoutUtils;
-import accord.verify.CompositeVerifier;
-import accord.verify.ElleVerifier;
 import accord.verify.StrictSerializabilityVerifier;
 import accord.verify.Verifier;
 import org.agrona.collections.Int2ObjectHashMap;
@@ -102,12 +100,11 @@ import static accord.impl.PrefixedIntHashKey.forHash;
 import static accord.impl.PrefixedIntHashKey.range;
 import static accord.impl.PrefixedIntHashKey.ranges;
 import static accord.primitives.Txn.Kind.EphemeralRead;
-import static accord.utils.Invariants.illegalArgument;
 import static accord.utils.Utils.toArray;
 
-public class BurnTest
+public class BurnTestBase
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(BurnTest.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(BurnTestBase.class);
 
     /**
      * Min hash value for the test domain, this value must be respected by the 
hash function
@@ -295,14 +292,14 @@ public class BurnTest
         random2.setSeed(seed);
         ExecutorService exec = Executors.newFixedThreadPool(2);
         RandomDelayQueue.ReconcilingQueueFactory factory = new 
RandomDelayQueue.ReconcilingQueueFactory(seed);
-        Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(true)));
-        Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(false)));
+        Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(true), InMemoryJournal::new));
+        Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory, 
clients, nodes, keyCount, prefixCount, operations, concurrency, 
factory.get(false), InMemoryJournal::new));
         exec.shutdown();
         f1.get();
         f2.get();
     }
 
-    static void burn(RandomSource random, TopologyFactory topologyFactory, 
List<Id> clients, List<Id> nodes, int keyCount, int prefixCount, int 
operations, int concurrency, PendingQueue pendingQueue)
+    public static void burn(RandomSource random, TopologyFactory 
topologyFactory, List<Id> clients, List<Id> nodes, int keyCount, int 
prefixCount, int operations, int concurrency, PendingQueue pendingQueue, 
Function<Id, Journal> journalFactory)
     {
         List<Throwable> failures = Collections.synchronizedList(new 
ArrayList<>());
         AtomicLong progress = new AtomicLong();
@@ -330,12 +327,12 @@ public class BurnTest
             RandomSource forked = random.fork();
             // TODO (expected): meta-randomise scale of clock drift
             return FrequentLargeRange.builder(forked)
-                                                   .ratio(1, 5)
-                                                   .small(50, 5000, 
TimeUnit.MICROSECONDS)
-                                                   .large(1, 10, 
TimeUnit.MILLISECONDS)
-                                                   .build()
-                                                   .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
-                    .asLongSupplier(forked);
+                                     .ratio(1, 5)
+                                     .small(50, 5000, TimeUnit.MICROSECONDS)
+                                     .large(1, 10, TimeUnit.MILLISECONDS)
+                                     .build()
+                                     .mapAsLong(j -> Math.max(0, 
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
+                                     .asLongSupplier(forked);
         };
 
         SimulatedDelayedExecutorService globalExecutor = new 
SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L, 
failures::add, retryBootstrap, (i1, i2) -> {
@@ -463,8 +460,8 @@ public class BurnTest
                                           responseSink, random::fork, 
nowSupplier,
                                           topologyFactory, 
initialRequests::poll,
                                           onSubmitted::set,
-                                          ignore -> {}
-            );
+                                          ignore -> {},
+                                          journalFactory);
             verifier.close();
         }
         catch (Throwable t)
@@ -520,69 +517,12 @@ public class BurnTest
         return stats.toString();
     }
 
-    private static Verifier createVerifier(int keyCount)
-    {
-        if (!ElleVerifier.Support.allowed())
-            return new StrictSerializabilityVerifier("", keyCount);
-        return CompositeVerifier.create(new StrictSerializabilityVerifier("", 
keyCount),
-                                        new ElleVerifier());
-    }
-
-    public static void main(String[] args)
-    {
-        int count = 1;
-        int operations = 1000;
-        Long overrideSeed = null;
-        boolean reconcile = false;
-        LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
-        boolean hasOverriddenSeed = false;
-        for (int i = 0 ; i < args.length ; i += 2)
-        {
-            switch (args[i])
-            {
-                default: throw illegalArgument("Invalid option: " + args[i]);
-                case "-c":
-                    count = Integer.parseInt(args[i + 1]);
-                    if (hasOverriddenSeed)
-                        throw illegalArgument("Cannot override both seed (-s) 
and number of seeds to run (-c)");
-                    overrideSeed = null;
-                    break;
-                case "-s":
-                    overrideSeed = Long.parseLong(args[i + 1]);
-                    hasOverriddenSeed = true;
-                    count = 1;
-                    break;
-                case "-o":
-                    operations = Integer.parseInt(args[i + 1]);
-                    break;
-                case "-r":
-                    reconcile = true;
-                    --i;
-                    break;
-                case "--loop-seed":
-                    seedGenerator = new DefaultRandom(Long.parseLong(args[i + 
1]))::nextLong;
-            }
-        }
-        while (count-- > 0)
-        {
-            if (!reconcile)
-            {
-                run(overrideSeed != null ? overrideSeed : 
seedGenerator.getAsLong(), operations);
-            }
-            else
-            {
-                reconcile(overrideSeed != null ? overrideSeed : 
seedGenerator.getAsLong(), operations);
-            }
-        }
-    }
-
-    @Test
-    public void testOne()
+    protected static Verifier createVerifier(int keyCount)
     {
-        run(System.nanoTime());
+        return new StrictSerializabilityVerifier("", keyCount);
     }
 
-    private static void run(long seed)
+    protected static void run(long seed)
     {
         Duration timeout = Duration.ofMinutes(3);
         try
@@ -605,7 +545,7 @@ public class BurnTest
         }
     }
 
-    private static void run(long seed, int operations)
+    protected static void run(long seed, int operations)
     {
         logger.info("Seed: {}", seed);
         Cluster.trace.trace("Seed: {}", seed);
@@ -624,13 +564,14 @@ public class BurnTest
             List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
 
             burn(random, new TopologyFactory(rf, ranges(0, HASH_RANGE_START, 
HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, rf), nodes.size() * 
3))),
-                    clients,
-                    nodes,
-                    5 + random.nextInt(15),
-                    5 + random.nextInt(15),
-                    operations,
-                    10 + random.nextInt(30),
-                 new Factory(random).get());
+                 clients,
+                 nodes,
+                 5 + random.nextInt(15),
+                 5 + random.nextInt(15),
+                 operations,
+                 10 + random.nextInt(30),
+                 new Factory(random).get(),
+                 InMemoryJournal::new);
         }
         catch (Throwable t)
         {
@@ -639,7 +580,7 @@ public class BurnTest
         }
     }
 
-    private static void reconcile(long seed, int operations)
+    protected static void reconcile(long seed, int operations)
     {
         logger.info("Seed: {}", seed);
         Cluster.trace.trace("Seed: {}", seed);
@@ -658,12 +599,12 @@ public class BurnTest
             List<Id> nodes = generateIds(false, random.nextInt(rf, rf * 3));
 
             reconcile(seed, new TopologyFactory(rf, ranges(0, 
HASH_RANGE_START, HASH_RANGE_END, random.nextInt(Math.max(nodes.size() + 1, 
rf), nodes.size() * 3))),
-                    clients,
-                    nodes,
-                    5 + random.nextInt(15),
-                    5 + random.nextInt(15),
-                    operations,
-                    10 + random.nextInt(30));
+                      clients,
+                      nodes,
+                      5 + random.nextInt(15),
+                      5 + random.nextInt(15),
+                      operations,
+                      10 + random.nextInt(30));
         }
         catch (Throwable t)
         {
@@ -672,7 +613,7 @@ public class BurnTest
         }
     }
 
-    private static List<Id> generateIds(boolean clients, int count)
+    public static List<Id> generateIds(boolean clients, int count)
     {
         List<Id> ids = new ArrayList<>();
         for (int i = 1; i <= count ; ++i)
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java 
b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 0585204e..1712b960 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -111,7 +111,7 @@ public abstract class IntHashKey implements RoutableKey
 
     public static final class Key extends IntHashKey implements accord.api.Key
     {
-        private Key(int key)
+        public Key(int key)
         {
             super(key);
         }
diff --git a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java 
b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
index a6a8bc84..575ed6d1 100644
--- a/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/PrefixedIntHashKey.java
@@ -157,7 +157,7 @@ public class PrefixedIntHashKey implements RoutableKey
 
     public static final class Hash extends PrefixedIntRoutingKey
     {
-        private Hash(int prefix, int hash)
+        public Hash(int prefix, int hash)
         {
             super(prefix, hash);
         }
diff --git a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java 
b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
index 7f1f81b8..9ea735c4 100644
--- a/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
+++ b/accord-core/src/test/java/accord/impl/RemoteListenersTest.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.ProgressLog;
 import accord.api.RemoteListeners;
 import accord.api.RemoteListeners.Registration;
@@ -394,6 +395,12 @@ public class RemoteListenersTest
             this.storeId = id;
         }
 
+        @Override
+        public Journal.Loader loader()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         @Override public boolean inStore() { return false; }
         @Override public AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
         @Override public <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { return null; }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 4bccbd5e..93b792e0 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -46,11 +46,11 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 
-import org.junit.jupiter.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.api.BarrierType;
+import accord.api.Journal;
 import accord.api.LocalConfig;
 import accord.api.MessageSink;
 import accord.api.RoutingKey;
@@ -82,6 +82,7 @@ import accord.impl.progresslog.DefaultProgressLogs;
 import accord.local.AgentExecutor;
 import accord.local.Command;
 import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.Node.Id;
@@ -450,7 +451,8 @@ public class Cluster
                                                                 }
                                                             },
                                                             Runnable::run,
-                                                            nodeMap::set);
+                                                            nodeMap::set,
+                                                            
InMemoryJournal::new);
         if (!failures.isEmpty())
         {
             AssertionError error = new AssertionError("Unexpected errors 
detected");
@@ -465,7 +467,7 @@ public class Cluster
                                               Runnable checkFailures, 
Consumer<Packet> responseSink,
                                               Supplier<RandomSource> 
randomSupplier, Supplier<LongSupplier> nowSupplierSupplier,
                                               TopologyFactory topologyFactory, 
Supplier<Packet> in, Consumer<Runnable> noMoreWorkSignal,
-                                              Consumer<Map<Id, Node>> 
readySignal)
+                                              Consumer<Map<Id, Node>> 
readySignal, Function<Node.Id, Journal> journalFactory)
     {
         Topology topology = topologyFactory.toTopology(nodes);
         Map<Id, Node> nodeMap = new LinkedHashMap<>();
@@ -523,7 +525,7 @@ public class Cluster
                 BiConsumer<Timestamp, Ranges> onStale = (sinceAtLeast, ranges) 
-> configRandomizer.onStale(id, sinceAtLeast, ranges);
                 AgentExecutor nodeExecutor = nodeExecutorSupplier.apply(id, 
onStale);
                 executorMap.put(id, nodeExecutor);
-                Journal journal = new Journal(id);
+                Journal journal = journalFactory.apply(id);
                 journalMap.put(id, journal);
                 BurnTestConfigurationService configService = new 
BurnTestConfigurationService(id, nodeExecutor, randomSupplier, topology, 
nodeMap::get, topologyUpdates);
                 DelayedCommandStores.CacheLoadingChance isLoadedCheck = new 
DelayedCommandStores.CacheLoadingChance()
@@ -596,7 +598,7 @@ public class Cluster
 
             AsyncResult<?> startup = 
AsyncChains.reduce(nodeMap.values().stream().map(Node::unsafeStart).collect(toList()),
 (a, b) -> null).beginAsResult();
             while (sinks.processPending());
-            Assertions.assertTrue(startup.isDone());
+            Invariants.checkArgument(startup.isDone());
 
             ClusterScheduler clusterScheduler = sinks.new ClusterScheduler(-1);
             List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes));
@@ -626,14 +628,14 @@ public class Cluster
                     listStore.restoreFromSnapshot();
                     // we are simulating node restart, so its remote listeners 
will also be gone
                     
((DefaultRemoteListeners)nodeMap.get(id).remoteListeners()).clear();
-                    Journal journal = journalMap.get(id);
                     Int2ObjectHashMap<NavigableMap<TxnId, Command>> 
beforeStores = copyCommands(stores);
                     for (CommandStore s : stores)
                     {
                         DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
                         store.clearForTesting();
-                        journal.reconstructAll(store.loader(), store.id());
                     }
+                    Journal journal = journalMap.get(id);
+                    journal.replay(nodeMap.get(id).commandStores());
                     while (sinks.drain(pred));
                     CommandsForKey.enableLinearizabilityViolationsReporting();
                     Invariants.checkState(listStore.equals(prevData));
@@ -713,10 +715,10 @@ public class Cluster
             Node node = nodeMap.get(id);
 
             Journal journal = journalMap.get(node.id());
-            CommandStore[] stores = 
nodeMap.get(node.id()).commandStores().all();
+            CommandStores stores = nodeMap.get(node.id()).commandStores();
             // run on node scheduler so doesn't run during replay
             scheduled = node.scheduler().selfRecurring(() -> {
-                journal.purge(j -> j < stores.length ? stores[j] : null);
+                journal.purge(stores);
                 schedule(clusterScheduler, rs, nodes, nodeMap, journalMap);
             }, 0, SECONDS);
         }
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index e3f7d8be..17afe245 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.function.BiConsumer;
@@ -33,6 +32,7 @@ import com.google.common.collect.Iterables;
 
 import accord.api.Agent;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
@@ -65,6 +65,7 @@ import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.Cancellable;
 
+import static accord.api.Journal.CommandUpdate;
 import static accord.utils.Invariants.Paranoia.LINEAR;
 import static accord.utils.Invariants.ParanoiaCostFactor.HIGH;
 
@@ -153,7 +154,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
                 return;
 
             // Journal will not have result persisted. This part is here for 
test purposes and ensuring that we have strict object equality.
-            Command reconstructed = journal.reconstruct(id, current.txnId());
+            Command reconstructed = journal.loadCommand(id, current.txnId(), 
unsafeGetRedundantBefore(), durableBefore());
             List<Difference<?>> diff = 
ReflectionUtils.recursiveEquals(current, reconstructed);
             Invariants.checkState(diff.isEmpty(), "Commands did not match: 
expected %s, given %s, node %s, store %d, diff %s", current, reconstructed, 
node, id(), new LazyToString(() -> String.join("\n", Iterables.transform(diff, 
Object::toString))));
         }
@@ -283,7 +284,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
 
                 Command before = safe.original();
                 Command after = safe.current();
-                commandStore.journal.onExecute(commandStore.id(), before, 
after, Objects.equals(context.primaryTxnId(), after.txnId()));
+                commandStore.journal.saveCommand(commandStore.id(), new 
CommandUpdate(before, after), () -> {});
                 commandStore.validateRead(safe.current());
             });
             super.postExecute();
diff --git a/accord-core/src/test/java/accord/impl/basic/Journal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
similarity index 66%
rename from accord-core/src/test/java/accord/impl/basic/Journal.java
rename to accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 2543e3a5..d48cbb1b 100644
--- a/accord-core/src/test/java/accord/impl/basic/Journal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -20,38 +20,41 @@ package accord.impl.basic;
 
 import java.util.AbstractList;
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Function;
-import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Journal;
 import accord.api.Result;
 import accord.impl.InMemoryCommandStore;
 import accord.local.Cleanup;
 import accord.local.Command;
 import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Commands;
 import accord.local.CommonAttributes;
+import accord.local.DurableBefore;
 import accord.local.Node;
+import accord.local.RedundantBefore;
 import accord.local.StoreParticipants;
 import accord.primitives.Ballot;
 import accord.primitives.Known;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Ranges;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
 import accord.utils.Invariants;
-import org.agrona.collections.Long2ObjectHashMap;
+import org.agrona.collections.Int2ObjectHashMap;
 
 import static accord.primitives.SaveStatus.NotDefined;
 import static accord.primitives.SaveStatus.Stable;
@@ -59,24 +62,108 @@ import static accord.primitives.Status.Invalidated;
 import static accord.primitives.Status.Truncated;
 import static accord.utils.Invariants.illegalState;
 
-public class Journal
+public class InMemoryJournal implements Journal
 {
-    private final Long2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Long2ObjectHashMap<>();
+    private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
+    private final FieldStates fieldStates;
+
+    private static class FieldStates
+    {
+        RedundantBefore redundantBefore = RedundantBefore.EMPTY;
+        NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY);
+        NavigableMap<Timestamp, Ranges> safeToRead = 
ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY);
+        CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null;
+    }
 
     private final Node.Id id;
 
-    public Journal(Node.Id id)
+    public InMemoryJournal(Node.Id id)
     {
         this.id = id;
+        this.fieldStates = new FieldStates();
+    }
+
+    @Override
+    public Command loadCommand(int commandStoreId, TxnId txnId,
+                               // TODO: currently unused!
+                               RedundantBefore redundantBefore, DurableBefore 
durableBefore)
+    {
+        NavigableMap<TxnId, List<Diff>> commandStore = 
this.diffsPerCommandStore.get(commandStoreId);
+
+        if (commandStore == null)
+            return null;
+
+        List<Diff> saved = 
this.diffsPerCommandStore.get(commandStoreId).get(txnId);
+        if (saved == null)
+            return null;
+
+        return reconstruct(saved);
+    }
+
+    @Override
+    public void saveCommand(int store, CommandUpdate diff, Runnable onFlush)
+    {
+        if (diff == null ||
+            diff.before == diff.after
+            || diff.after == null
+            || diff.after.saveStatus() == SaveStatus.Uninitialised)
+            return;
+
+        diffsPerCommandStore.computeIfAbsent(store, (k) -> new TreeMap<>())
+                            .computeIfAbsent(diff.txnId, (k_) -> new 
ArrayList<>())
+                            .add(diff(diff.before, diff.after));
+
+        if (onFlush!= null)
+            onFlush.run();
+    }
+
+    @Override
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        return fieldStates.redundantBefore;
     }
 
-    public void purge(IntFunction<CommandStore> storeSupplier)
+    @Override
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
-        for (Map.Entry<Long, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
+        return fieldStates.bootstrapBeganAt;
+    }
+
+    @Override
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        return fieldStates.safeToRead;
+    }
+
+    @Override
+    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    {
+        return fieldStates.rangesForEpoch;
+    }
+
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
+        if (fieldUpdates.newRedundantBefore != null)
+            fieldStates.redundantBefore = fieldUpdates.newRedundantBefore;
+        if (fieldUpdates.newSafeToRead != null)
+            fieldStates.safeToRead = fieldUpdates.newSafeToRead;
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
+        if (fieldUpdates.newRangesForEpoch != null)
+            fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch;
+
+        if (onFlush!= null)
+            onFlush.run();
+    }
+
+    @Override
+    public void purge(CommandStores commandStores)
+    {
+        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> e : 
diffsPerCommandStore.entrySet())
         {
-            int commandStoreId = e.getKey().intValue();
+            int commandStoreId = e.getKey();
             Map<TxnId, List<Diff>> localJournal = e.getValue();
-            CommandStore store = storeSupplier.apply(commandStoreId);
+            CommandStore store = commandStores.forId(commandStoreId);
             if (store == null)
                 continue;
 
@@ -84,7 +171,7 @@ public class Journal
             {
                 List<Diff> diffs = e2.getValue();
                 if (diffs.isEmpty()) continue;
-                Command command = reconstruct(diffs, Reconstruct.Last).get(0);
+                Command command =  reconstruct(diffs);
                 if (command.status() == Truncated || command.status() == 
Invalidated)
                     continue; // Already truncated
                 Cleanup cleanup = Cleanup.shouldCleanup(store.agent(), 
command, store.unsafeGetRedundantBefore(), store.durableBefore());
@@ -110,32 +197,37 @@ public class Journal
         }
     }
 
-    public void reconstructAll(InMemoryCommandStore.Loader loader, int 
commandStoreId)
+    @Override
+    public void replay(CommandStores commandStores)
     {
-        // Nothing to do here, journal is empty for this command store
-        if (!diffsPerCommandStore.containsKey(commandStoreId))
-            return;
-
-        // copy to avoid concurrent modification when appending to journal
-        Map<TxnId, List<Diff>> diffs = new 
TreeMap<>(diffsPerCommandStore.get(commandStoreId));
-        for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
-            e.setValue(new ArrayList<>(e.getValue()));
+        OnDone sync = new OnDone() {
+            public void success() {}
+            public void failure(Throwable t) { throw new 
RuntimeException("Caught an exception during replay", t); }
+        };
 
-        List<Command> toApply = new ArrayList<>();
-        for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+        for (Map.Entry<Integer, NavigableMap<TxnId, List<Diff>>> diffEntry : 
diffsPerCommandStore.entrySet())
         {
-            if (e.getValue().isEmpty()) continue;
-            Command command = reconstruct(commandStoreId, e.getKey(), 
e.getValue());
-            Invariants.checkState(command.saveStatus() != 
SaveStatus.Uninitialised,
-                                  "Found uninitialized command in the log: 
%s", command);
-            if (command.saveStatus().compareTo(Stable) >= 0 && 
!command.hasBeen(Truncated))
-                toApply.add(command);
-            loader.load(command);
-        }
+            int commandStoreId = diffEntry.getKey();
+            // copy to avoid concurrent modification when appending to journal
+            Map<TxnId, List<Diff>> diffs = new TreeMap<>(diffEntry.getValue());
+
+            InMemoryCommandStore commandStore = (InMemoryCommandStore) 
commandStores.forId(commandStoreId);
+            Loader loader = commandStore.loader();
 
-        toApply.sort(Comparator.comparing(Command::executeAt));
-        for (Command command : toApply)
-            loader.apply(command);
+            for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+                e.setValue(new ArrayList<>(e.getValue()));
+
+            for (Map.Entry<TxnId, List<Diff>> e : diffs.entrySet())
+            {
+                if (e.getValue().isEmpty()) continue;
+                Command command = reconstruct(e.getValue());
+                Invariants.checkState(command.saveStatus() != 
SaveStatus.Uninitialised,
+                                      "Found uninitialized command in the log: 
%s %s", diffEntry.getKey(), e.getValue());
+                loader.load(command, sync);
+                if (command.saveStatus().compareTo(Stable) >= 0 && 
!command.hasBeen(Truncated))
+                    loader.apply(command, sync);
+            }
+        }
     }
 
     static class ErasedList extends AbstractList<Diff>
@@ -206,28 +298,10 @@ public class Journal
         }
     }
 
-    private enum Reconstruct
-    {
-        Each,
-        Last
-    }
-
-    public Command reconstruct(int commandStoreId, TxnId txnId)
-    {
-        return reconstruct(commandStoreId, txnId, 
diffsPerCommandStore.get(commandStoreId).get(txnId));
-    }
-
-    public Command reconstruct(int commandStoreId, TxnId txnId, List<Diff> 
diffs)
-    {
-        return reconstruct(diffs, Reconstruct.Last).get(0);
-    }
-
-    private List<Command> reconstruct(List<Diff> diffs, Reconstruct 
reconstruct)
+    private Command reconstruct(List<Diff> diffs)
     {
         Invariants.checkState(diffs != null && !diffs.isEmpty());
 
-        List<Command> results = new ArrayList<>();
-
         TxnId txnId = null;
         Timestamp executeAt = null;
         Timestamp executesAtLeast = null;
@@ -255,13 +329,7 @@ public class Journal
             if (diff.executesAtLeast != null)
                 executesAtLeast = diff.executesAtLeast.get();
             if (diff.saveStatus != null)
-            {
-                Set<SaveStatus> allowed = new HashSet<>();
-                allowed.add(SaveStatus.TruncatedApply);
-                allowed.add(SaveStatus.TruncatedApplyWithOutcome);
-
                 saveStatus = diff.saveStatus.get();
-            }
             if (diff.durability != null)
                 durability = diff.durability.get();
 
@@ -302,75 +370,68 @@ public class Journal
                     result = null;
                     break;
             }
+        }
 
-            CommonAttributes.Mutable attrs = new 
CommonAttributes.Mutable(txnId);
-            if (partialTxn != null)
-                attrs.partialTxn(partialTxn);
-            if (durability != null)
-                attrs.durability(durability);
-            if (participants != null) attrs.setParticipants(participants);
-            else attrs.setParticipants(StoreParticipants.empty(txnId));
-
-            // TODO (desired): we can simplify this logic if, instead of 
diffing, we will infer the diff from the status
-            if (partialDeps != null &&
-                (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
-                 saveStatus.known.deps != Known.KnownDeps.DepsErased &&
-                 saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
-                attrs.partialDeps(partialDeps);
-            Invariants.checkState(saveStatus != null,
-                                  "Save status is null after applying %s", 
diffs);
+        CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(txnId);
+        if (partialTxn != null)
+            attrs.partialTxn(partialTxn);
+        if (durability != null)
+            attrs.durability(durability);
+        if (participants != null) attrs.setParticipants(participants);
+        else attrs.setParticipants(StoreParticipants.empty(txnId));
+
+        // TODO (desired): we can simplify this logic if, instead of diffing, 
we will infer the diff from the status
+        if (partialDeps != null &&
+            (saveStatus.known.deps != Known.KnownDeps.NoDeps &&
+             saveStatus.known.deps != Known.KnownDeps.DepsErased &&
+             saveStatus.known.deps != Known.KnownDeps.DepsUnknown))
+            attrs.partialDeps(partialDeps);
+
+        try
+        {
 
-            try
+            Command current;
+            switch (saveStatus.status)
             {
-                if (reconstruct == Reconstruct.Each ||
-                    (reconstruct == Reconstruct.Last && i == diffs.size() - 1))
-                {
-                    Command current;
-                    switch (saveStatus.status)
-                    {
-                        case NotDefined:
-                            current = saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
-                                                                             : 
Command.NotDefined.notDefined(attrs, promised);
-                            break;
-                        case PreAccepted:
-                            current = Command.PreAccepted.preAccepted(attrs, 
executeAt, promised);
-                            break;
-                        case AcceptedInvalidate:
-                        case Accepted:
-                        case PreCommitted:
-                            if (saveStatus == SaveStatus.AcceptedInvalidate)
-                                current = 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
-                            else
-                                current = Command.Accepted.accepted(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted);
-                            break;
-                        case Committed:
-                        case Stable:
-                            current = Command.Committed.committed(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn);
-                            break;
-                        case PreApplied:
-                        case Applied:
-                            current = Command.Executed.executed(attrs, 
saveStatus, executeAt, promised, acceptedOrCommitted, waitingOn, writes, 
result);
-                            break;
-                        case Invalidated:
-                        case Truncated:
-                            current = truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
-                            break;
-                        default:
-                            throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
-                    }
-
-                    results.add(current);
-                }
+                case NotDefined:
+                    current = saveStatus == SaveStatus.Uninitialised ? 
Command.NotDefined.uninitialised(attrs.txnId())
+                                                                     : 
Command.NotDefined.notDefined(attrs, promised);
+                    break;
+                case PreAccepted:
+                    current = Command.PreAccepted.preAccepted(attrs, 
executeAt, promised);
+                    break;
+                case AcceptedInvalidate:
+                case Accepted:
+                case PreCommitted:
+                    if (saveStatus == SaveStatus.AcceptedInvalidate)
+                        current = 
Command.AcceptedInvalidateWithoutDefinition.acceptedInvalidate(attrs, promised, 
acceptedOrCommitted);
+                    else
+                        current = Command.Accepted.accepted(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted);
+                    break;
+                case Committed:
+                case Stable:
+                    current = Command.Committed.committed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn);
+                    break;
+                case PreApplied:
+                case Applied:
+                    current = Command.Executed.executed(attrs, saveStatus, 
executeAt, promised, acceptedOrCommitted, waitingOn, writes, result);
+                    break;
+                case Invalidated:
+                case Truncated:
+                    current = truncated(attrs, saveStatus, executeAt, 
executesAtLeast, writes, result);
+                    break;
+                default:
+                    throw new IllegalStateException("Do not know " + 
saveStatus.status + " " + saveStatus);
             }
-            catch (Throwable t)
-            {
 
-                throw new RuntimeException("Can not reconstruct from diff:\n" 
+ diffs.stream().map(o -> o.toString())
-                                                                               
      .collect(Collectors.joining("\n")),
-                                           t);
-            }
+            return current;
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException("Can not reconstruct from diff:\n" + 
diffs.stream().map(o -> o.toString())
+                                                                               
  .collect(Collectors.joining("\n")),
+                                       t);
         }
-        return results;
     }
 
     private static Command.Truncated truncated(CommonAttributes.Mutable attrs, 
SaveStatus status, Timestamp executeAt, Timestamp executesAtLeast, Writes 
writes, Result result)
@@ -392,25 +453,6 @@ public class Journal
         }
     }
 
-    public void onExecute(int commandStoreId, Command before, Command after, 
boolean isPrimary)
-    {
-        if (before == after
-            || after == null
-            || after.saveStatus() == SaveStatus.Uninitialised)
-            return;
-
-        Diff diff = diff(before, after);
-        if (!isPrimary)
-            diff = diff.asNonPrimary();
-
-        if (diff != null)
-        {
-            diffsPerCommandStore.computeIfAbsent(commandStoreId, (k) -> new 
TreeMap<>())
-                                .computeIfAbsent(after.txnId(), (k_) -> new 
ArrayList<>())
-                                .add(diff);
-        }
-    }
-
     private static class Diff
     {
         public final TxnId txnId;
@@ -467,12 +509,6 @@ public class Journal
             this.result = result;
         }
 
-        // We allow only save status, and waitingOn to be updated by 
non-primary transactions
-        public Diff asNonPrimary()
-        {
-            return new Diff(txnId, null, null, saveStatus, null, null, null, 
null, null, null, waitingOn, null, null);
-        }
-
         public boolean allNulls()
         {
             if (txnId != null) return false;
@@ -543,7 +579,7 @@ public class Journal
                              ifNotEqual(before, after, Command::participants, 
true),
                              ifNotEqual(before, after, Command::partialTxn, 
false),
                              ifNotEqual(before, after, Command::partialDeps, 
false),
-                             ifNotEqual(before, after, Journal::getWaitingOn, 
true),
+                             ifNotEqual(before, after, 
InMemoryJournal::getWaitingOn, true),
                              ifNotEqual(before, after, Command::writes, false),
                              ifNotEqual(before, after, Command::result, 
false));
         if (diff.allNulls())
@@ -620,5 +656,4 @@ public class Journal
             throw new UnsupportedOperationException();
         }
     }
-
 }
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
new file mode 100644
index 00000000..69c799b8
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.NavigableMap;
+
+import accord.api.Journal;
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+
+/**
+ * Logging journal, a wrapper over journal for debugging / inspecting history 
purposes.
+ */
+public class LoggingJournal implements Journal
+{
+    private final BufferedWriter log;
+    private final Journal delegate;
+
+    public LoggingJournal(Journal delegate, String path)
+    {
+        this.delegate = delegate;
+        File f = new File(path);
+        try
+        {
+            log = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(f)));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private synchronized void log(String format, Object... objects)
+    {
+        try
+        {
+            log.write(String.format(format, objects));
+            log.flush();
+        }
+        catch (IOException e)
+        {
+            // ignore
+        }
+    }
+
+    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        return delegate.loadCommand(commandStoreId, txnId, redundantBefore, 
durableBefore);
+    }
+
+    public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
+    {
+        log("%d: %s\n", store, update.after);
+        delegate.saveCommand(store, update, onFlush);
+    }
+
+    public void purge(CommandStores commandStores)
+    {
+        log("PURGE\n");
+        delegate.purge(commandStores);
+    }
+
+    public void replay(CommandStores commandStores)
+    {
+        delegate.replay(commandStores);
+    }
+
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        return delegate.loadRedundantBefore(commandStoreId);
+    }
+
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+    {
+        return delegate.loadBootstrapBeganAt(commandStoreId);
+    }
+
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        return delegate.loadSafeToRead(commandStoreId);
+    }
+
+    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    {
+        return delegate.loadRangesForEpoch(commandStoreId);
+    }
+
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
+        log("%d: %s", store, fieldUpdates);
+        delegate.saveStoreState(store, fieldUpdates, onFlush);
+    }
+}
\ No newline at end of file
diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
new file mode 100644
index 00000000..736a4f15
--- /dev/null
+++ b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl.basic;
+
+import java.util.NavigableMap;
+
+import accord.api.Journal;
+import accord.local.Command;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.RedundantBefore;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+
+/**
+ * A simple version of journal that can be useful for debugging issues with an 
implementation that checks command loading
+ * from SUT vs model.
+ */
+public class VerifyingJournal implements Journal
+{
+    private final Journal sut;
+    private final Journal model;
+
+    public VerifyingJournal(Journal model, Journal sut)
+    {
+        this.model = model;
+        this.sut = sut;
+    }
+
+    public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        Command sut = this.sut.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
+        Command model = this.model.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
+        Invariants.checkState(sut.equals(model));
+        return sut;
+    }
+
+    public void saveCommand(int store, CommandUpdate update, Runnable onFlush)
+    {
+        model.saveCommand(store, update, null);
+        sut.saveCommand(store, update, onFlush);
+    }
+
+    public void purge(CommandStores commandStores)
+    {
+        model.purge(commandStores);
+        sut.purge(commandStores);
+    }
+
+    public void replay(CommandStores commandStores)
+    {
+        sut.replay(commandStores);
+    }
+
+    public RedundantBefore loadRedundantBefore(int commandStoreId)
+    {
+        return sut.loadRedundantBefore(commandStoreId);
+    }
+
+    public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
+    {
+        return sut.loadBootstrapBeganAt(commandStoreId);
+    }
+
+    public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
+    {
+        return sut.loadSafeToRead(commandStoreId);
+    }
+
+    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    {
+        return sut.loadRangesForEpoch(commandStoreId);
+    }
+
+    public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
+    {
+        sut.saveStoreState(store, fieldUpdates, onFlush);
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java 
b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index de886b99..f10ac792 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -38,9 +38,9 @@ import javax.annotation.Nonnull;
 
 public class ListQuery implements Query
 {
-    final Id client;
-    final long requestId;
-    final boolean isEphemeralRead;
+    public final Id client;
+    public final long requestId;
+    public final boolean isEphemeralRead;
 
     public ListQuery(Id client, long requestId, boolean isEphemeralRead)
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java 
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 915442ad..b1ea3c7c 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -46,7 +46,7 @@ public class ListRead implements Read
     private static final Logger logger = 
LoggerFactory.getLogger(ListRead.class);
 
     private final Function<? super CommandStore, AsyncExecutor> executor;
-    private final boolean isEphemeralRead;
+    public final boolean isEphemeralRead;
     public final Seekables<?, ?> userReadKeys; // those only to be returned to 
user
     public final Seekables<?, ?> keys; // those including necessary for writes
 
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java 
b/accord-core/src/test/java/accord/impl/list/ListResult.java
index adfa1fe3..c752bd0b 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -42,7 +42,7 @@ public class ListResult implements Result, Reply
     public final Keys responseKeys;
     public final int[][] read; // equal in size to keys.size()
     public final ListUpdate update;
-    private final Status status;
+    public final Status status;
 
     public ListResult(Status status, Id client, long requestId, TxnId txnId, 
Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
     {
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 50b2b97c..1123ab73 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import accord.api.Agent;
 import accord.api.Data;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.ProgressLog;
 import accord.api.ProgressLog.BlockedUntil;
 import accord.api.Query;
@@ -949,6 +950,11 @@ public class CommandsForKeyTest
             return true;
         }
 
+        public Journal.Loader loader()
+        {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public Agent agent()
         {
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java 
b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index ab24cf72..1fab5e1f 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -53,8 +53,8 @@ import accord.utils.RandomSource;
 import accord.utils.SortedArrays.SortedArrayList;
 import org.agrona.collections.IntHashSet;
 
-import static accord.burn.BurnTest.HASH_RANGE_END;
-import static accord.burn.BurnTest.HASH_RANGE_START;
+import static accord.burn.BurnTestBase.HASH_RANGE_END;
+import static accord.burn.BurnTestBase.HASH_RANGE_START;
 
 // TODO (testing): add change replication factor
 public class TopologyRandomizer


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to