This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c076383e Improve topology integration / replay
c076383e is described below

commit c076383eb432670c4d919e9fd0db76296169ea00
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Fri Dec 6 15:58:16 2024 +0100

    Improve topology integration / replay
    
      - integrate topology reloading into burn test
      - improve epoch handling
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20142
---
 .../main/java/accord/api/ConfigurationService.java |   5 +-
 accord-core/src/main/java/accord/api/Journal.java  |  58 ++++++++++--
 .../accord/impl/AbstractConfigurationService.java  |  35 ++++++-
 .../src/main/java/accord/impl/CommandChange.java   |   4 +-
 .../java/accord/impl/InMemoryCommandStore.java     |   4 +-
 .../java/accord/impl/InMemoryCommandStores.java    |  17 ++--
 .../src/main/java/accord/local/Bootstrap.java      |  21 +++--
 .../src/main/java/accord/local/CommandStore.java   |   2 +
 .../src/main/java/accord/local/CommandStores.java  | 104 +++++++++++++++++++--
 accord-core/src/main/java/accord/local/Node.java   |   9 +-
 .../main/java/accord/topology/TopologyManager.java |   7 +-
 accord-core/src/test/java/accord/Utils.java        |   4 +-
 .../src/test/java/accord/impl/basic/Cluster.java   |  69 +++++++-------
 .../accord/impl/basic/DelayedCommandStores.java    |  93 ++++++++++++------
 .../java/accord/impl/basic/InMemoryJournal.java    |  41 ++++++++
 .../java/accord/impl/basic/LoggingJournal.java     |  17 ++++
 .../src/test/java/accord/impl/list/ListStore.java  |   7 +-
 .../test/java/accord/impl/mock/MockCluster.java    |  10 +-
 .../java/accord/local/ImmutableCommandTest.java    |  10 +-
 .../src/main/java/accord/maelstrom/Cluster.java    |  32 ++++++-
 .../src/main/java/accord/maelstrom/Main.java       |   4 +-
 21 files changed, 429 insertions(+), 124 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index 57678614..3650f274 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -18,7 +18,6 @@
 
 package accord.api;
 
-import java.util.Collection;
 import javax.annotation.Nullable;
 
 import accord.local.Node;
@@ -129,6 +128,8 @@ public interface ConfigurationService
          * the initial topology returned by `currentTopology` on startup.
          *
          * TODO (desired): document what this Future represents, or maybe 
refactor it away - only used for testing
+         *
+         *   * {@param isLoad} - whether current topology update is being 
loaded from the local node during startup
          */
         AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, 
boolean startSync);
 
@@ -160,7 +161,7 @@ public interface ConfigurationService
          */
         void onEpochRedundant(Ranges ranges, long epoch);
 
-        default void onRemoveNodes(long epoch, Collection<Node.Id> removed) {}
+        default void onRemoveNode(long epoch, Node.Id removed) {}
     }
 
     void registerListener(Listener listener);
diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 2fd70165..a201653d 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -18,7 +18,9 @@
 
 package accord.api;
 
+import java.util.Iterator;
 import java.util.NavigableMap;
+import java.util.Objects;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
@@ -29,31 +31,75 @@ import accord.local.RedundantBefore;
 import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.topology.Topology;
 import accord.utils.PersistentField.Persister;
+import org.agrona.collections.Int2ObjectHashMap;
 
 /**
  * Persisted journal for transactional recovery.
  */
 public interface Journal
 {
-    Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
-    Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
+    Command loadCommand(int store, TxnId txnId, RedundantBefore 
redundantBefore, DurableBefore durableBefore);
+    Command.Minimal loadMinimal(int store, TxnId txnId, Load load, 
RedundantBefore redundantBefore, DurableBefore durableBefore);
 
     // TODO (required): use OnDone instead of Runnable
     void saveCommand(int store, CommandUpdate value, Runnable onFlush);
 
+    Iterator<TopologyUpdate> replayTopologies(); // reverse iterator
+    void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush);
+
     void purge(CommandStores commandStores);
     void replay(CommandStores commandStores);
 
-    RedundantBefore loadRedundantBefore(int commandStoreId);
-    NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId);
-    NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
-    CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId);
+    RedundantBefore loadRedundantBefore(int store);
+    NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store);
+    NavigableMap<Timestamp, Ranges> loadSafeToRead(int store);
+    CommandStores.RangesForEpoch loadRangesForEpoch(int store);
 
     Persister<DurableBefore, DurableBefore> durableBeforePersister();
 
     void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
 
+    class TopologyUpdate
+    {
+        public final Int2ObjectHashMap<CommandStores.RangesForEpoch> 
commandStores;
+        public final Topology local;
+        public final Topology global;
+
+        public TopologyUpdate(@Nonnull 
Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull 
Topology local, @Nonnull Topology global)
+        {
+            this.commandStores = commandStores;
+            this.local = local;
+            this.global = global;
+        }
+
+        @Override
+        public boolean equals(Object object)
+        {
+            if (this == object) return true;
+            if (object == null || getClass() != object.getClass()) return 
false;
+            TopologyUpdate update = (TopologyUpdate) object;
+            return Objects.equals(commandStores, update.commandStores) && 
Objects.equals(local, update.local) && Objects.equals(global, update.global);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(commandStores, local, global);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "TopologyUpdate{" +
+                   "local=" + local +
+                   ", commandStores=" + commandStores +
+                   ", global=" + global +
+                   '}';
+        }
+    }
+
     class CommandUpdate
     {
         public final TxnId txnId;
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
index b2614b72..2a8fb94d 100644
--- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -140,7 +140,8 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
         synchronized EpochState getOrCreate(long epoch)
         {
-            Invariants.checkArgument(epoch > 0, "Epoch must be positive but 
given %d", epoch);
+            Invariants.checkArgument(epoch >= 0, "Epoch must be non-negative 
but given %d", epoch);
+            Invariants.checkArgument(epoch > 0 || (lastReceived == 0 && 
epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, 
epochsf; %s", lastReceived, epochs);
             if (epochs.isEmpty())
             {
                 EpochState state = createEpochState(epoch);
@@ -177,8 +178,20 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             {
                 long epoch = topology.epoch();
                 logger.debug("Receiving epoch {}", epoch);
-                Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 
|| lastReceived == 0,
-                                      "Epoch %d != %d + 1", epoch, 
lastReceived);
+                if (lastReceived >= epoch)
+                {
+                    // If we have already seen the epoch
+                    for (int i = epochs.size() - 1; i >= 0; i--)
+                    {
+                        EpochState expected = epochs.get(i);
+                        if (epoch == expected.epoch)
+                        {
+                            
Invariants.checkState(topology.equals(expected.topology),
+                                                  "Expected existing topology 
to match upsert, but %s != %s", topology, expected.topology);
+                            return;
+                        }
+                    }
+                }
                 state = getOrCreate(epoch);
                 state.topology = topology;
                 lastReceived = epoch;
@@ -286,7 +299,20 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
 
             maxRequestedEpoch = epoch;
         }
-        fetchTopologyInternal(epoch);
+
+        try
+        {
+            fetchTopologyInternal(epoch);
+        }
+        catch (Throwable t)
+        {
+            // This epoch will not be fetched, so we need to reset it back
+            synchronized (this)
+            {
+                maxRequestedEpoch = 0;
+            }
+            throw t;
+        }
     }
 
     // TODO (expected): rename, sync is too ambiguous
@@ -325,6 +351,7 @@ public abstract class 
AbstractConfigurationService<EpochState extends AbstractCo
             epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> 
reportTopology(topology, startSync, isLoad));
             return;
         }
+
         if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
         {
             logger.debug("Epoch {} received; waiting for {} to ack before 
reporting", topology.epoch(), lastAcked + 1);
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java 
b/accord-core/src/main/java/accord/impl/CommandChange.java
index e2c450d9..df74bf7e 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -511,6 +511,8 @@ public class CommandChange
     public static int getFlags(Command before, Command after)
     {
         int flags = 0;
+        if (before == null && after == null)
+            return flags;
 
         flags = collectFlags(before, after, Command::executeAt, true, 
EXECUTE_AT, flags);
         flags = collectFlags(before, after, Command::executesAtLeast, true, 
EXECUTES_AT_LEAST, flags);
@@ -531,7 +533,7 @@ public class CommandChange
 
         // Special-cased for Journal BurnTest integration
         if ((before != null && after.result() != before.result()) ||
-            (before == null && after.result() != null)) //TODO
+            (before == null && after.result() != null))
         {
             flags = collectFlags(before, after, Command::writes, false, 
RESULT, flags);
         }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 223fd1cc..71df36f6 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -124,9 +124,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
     private InMemorySafeStore current;
     private final Journal.Loader loader;
 
-    public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
+    public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent 
agent, DataStore store, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder)
     {
-        super(id, time, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
+        super(id, node, agent, store, progressLogFactory, listenersFactory, 
epochUpdateHolder);
         this.loader = new CommandLoader(this);
     }
 
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
index e1838d69..9e659df6 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java
@@ -18,6 +18,7 @@
 
 package accord.impl;
 
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.local.*;
 import accord.api.Agent;
@@ -29,30 +30,30 @@ public class InMemoryCommandStores
 {
     public static class Synchronized extends CommandStores
     {
-        public Synchronized(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
+        public Synchronized(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, Journal journal, ShardDistributor 
shardDistributor, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory)
         {
-            super(node, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new);
+            super(node, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new);
         }
     }
 
     public static class SingleThread extends CommandStores
     {
-        public SingleThread(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
+        public SingleThread(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, Journal journal, ShardDistributor 
shardDistributor, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory)
         {
-            super(node, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new);
+            super(node, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new);
         }
 
-        public SingleThread(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, CommandStore.Factory shardFactory)
+        public SingleThread(NodeCommandStoreService node, Agent agent, 
DataStore store, RandomSource random, Journal journal, ShardDistributor 
shardDistributor, ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
         {
-            super(node, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory);
+            super(node, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory, shardFactory);
         }
     }
 
     public static class Debug extends InMemoryCommandStores.SingleThread
     {
-        public Debug(NodeCommandStoreService node, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
+        public Debug(NodeCommandStoreService node, Agent agent, DataStore 
store, RandomSource random, Journal journal, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory)
         {
-            super(node, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new);
+            super(node, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new);
         }
     }
 }
diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java 
b/accord-core/src/main/java/accord/local/Bootstrap.java
index 5a2012ca..c466a84a 100644
--- a/accord-core/src/main/java/accord/local/Bootstrap.java
+++ b/accord-core/src/main/java/accord/local/Bootstrap.java
@@ -140,14 +140,19 @@ class Bootstrap
             Ranges commitRanges = valid;
             safeStore = safeStore;
             // we submit a separate execution so that we know 
markBootstrapping is durable before we initiate the fetch
-            safeStore.commandStore().submit(empty(), safeStore0 -> {
-                store.markBootstrapping(safeStore0, globalSyncId, 
commitRanges);
-                return CoordinateSyncPoint.exclusiveSyncPoint(node, 
globalSyncId, commitRanges);
-            }).flatMap(i -> i).flatMap(syncPoint -> node.withEpoch(epoch, () 
-> store.submit(empty(), safeStore1 -> {
-                if (valid.isEmpty()) // we've lost ownership of the range
-                    return AsyncResults.success(Ranges.EMPTY);
-                return fetch = safeStore1.dataStore().fetch(node, safeStore1, 
valid, syncPoint, this);
-            }))).flatMap(i -> i).begin(this);
+            safeStore.commandStore()
+                     .submit(empty(), safeStore0 -> {
+                         store.markBootstrapping(safeStore0, globalSyncId, 
commitRanges);
+                         return CoordinateSyncPoint.exclusiveSyncPoint(node, 
globalSyncId, commitRanges);
+                     })
+                     .flatMap(i -> i)
+                     .flatMap(syncPoint -> node.withEpoch(epoch, () -> 
store.submit(empty(), safeStore1 -> {
+                         if (valid.isEmpty()) // we've lost ownership of the 
range
+                             return AsyncResults.success(Ranges.EMPTY);
+                         return fetch = safeStore1.dataStore().fetch(node, 
safeStore1, valid, syncPoint, this);
+                     })))
+                     .flatMap(i -> i)
+                     .begin(this);
         }
 
         // we no longer want to fetch these ranges (perhaps we no longer own 
them)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 4873a747..9a0efc42 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -208,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor
         return id;
     }
 
+    public void restore() {};
+
     public abstract Journal.Loader loader();
 
     @Override
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 83050e9c..09d90163 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -20,6 +20,7 @@ package accord.local;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import accord.api.Agent;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.LocalListeners;
 import accord.api.ProgressLog;
 import accord.api.RoutingKey;
@@ -61,6 +63,7 @@ import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResults;
 import accord.utils.async.Cancellable;
 import org.agrona.collections.Hashing;
 import org.agrona.collections.Int2ObjectHashMap;
@@ -86,6 +89,7 @@ public abstract class CommandStores
                              Agent agent,
                              DataStore store,
                              RandomSource random,
+                             Journal journal,
                              ShardDistributor shardDistributor,
                              ProgressLog.Factory progressLogFactory,
                              LocalListeners.Factory listenersFactory);
@@ -128,6 +132,17 @@ public abstract class CommandStores
             this.store = store;
         }
 
+        private ShardHolder(CommandStore store, RangesForEpoch ranges)
+        {
+            this.store = store;
+            this.ranges = ranges;
+        }
+
+        public ShardHolder withStoreUnsafe(CommandStore store)
+        {
+            return new ShardHolder(store, ranges);
+        }
+
         RangesForEpoch ranges()
         {
             return ranges;
@@ -343,40 +358,96 @@ public abstract class CommandStores
         }
     }
 
-    protected static class Snapshot
+    // This method should only be used on node startup.
+    // "Unsafe" because it relies on user to synchronise and sequence the call 
properly.
+    public void restoreShardStateUnsafe(Consumer<Topology> reportTopology)
+    {
+        Iterator<Journal.TopologyUpdate> iter = journal.replayTopologies();
+        // First boot
+        if (!iter.hasNext())
+            return;
+
+        Journal.TopologyUpdate lastUpdate = null;
+        while (iter.hasNext())
+        {
+            Journal.TopologyUpdate update = iter.next();
+            reportTopology.accept(update.global);
+            if (lastUpdate == null || update.global.epoch() > 
lastUpdate.global.epoch())
+                lastUpdate = update;
+        }
+
+        ShardHolder[] shards = new 
ShardHolder[lastUpdate.commandStores.size()];
+        int i = 0;
+        for (Map.Entry<Integer, RangesForEpoch> e : 
lastUpdate.commandStores.entrySet())
+        {
+            RangesForEpoch ranges = e.getValue();
+            CommandStore commandStore = null;
+            for (ShardHolder shard : current.shards)
+            {
+                if (shard.ranges.equals(ranges))
+                    commandStore = shard.store;
+            }
+            Invariants.nonNull(commandStore, "Command store should have been 
reloaded").restore();
+            ShardHolder shard = new ShardHolder(commandStore, e.getValue());
+            shards[i++] = shard;
+        }
+
+        loadSnapshot(new Snapshot(shards, lastUpdate.local, 
lastUpdate.global));
+    }
+
+    protected void loadSnapshot(Snapshot toLoad)
+    {
+        current = toLoad;
+    }
+
+    protected static class Snapshot extends Journal.TopologyUpdate
     {
         public final ShardHolder[] shards;
-        final Int2ObjectHashMap<CommandStore> byId;
-        final Topology local;
-        final Topology global;
+        public final Int2ObjectHashMap<CommandStore> byId;
 
         Snapshot(ShardHolder[] shards, Topology local, Topology global)
         {
+            super(asMap(shards), local, global);
             this.shards = shards;
             this.byId = new Int2ObjectHashMap<>(shards.length, 
Hashing.DEFAULT_LOAD_FACTOR, true);
             for (ShardHolder shard : shards)
                 byId.put(shard.store.id(), shard.store);
-            this.local = local;
-            this.global = global;
+        }
+
+        // This method exists to ensure we do not hold references to command 
stores
+        public Journal.TopologyUpdate asTopologyUpdate()
+        {
+            return new Journal.TopologyUpdate(commandStores, local, global);
+        }
+
+        private static Int2ObjectHashMap<CommandStores.RangesForEpoch> 
asMap(ShardHolder[] shards)
+        {
+            Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores = 
new Int2ObjectHashMap<>();
+            for (ShardHolder shard : shards)
+                commandStores.put(shard.store.id, shard.ranges);
+            return commandStores;
         }
     }
 
     final StoreSupplier supplier;
     final ShardDistributor shardDistributor;
+    final Journal journal;
     volatile Snapshot current;
     int nextId;
 
-    private CommandStores(StoreSupplier supplier, ShardDistributor 
shardDistributor)
+    private CommandStores(StoreSupplier supplier, ShardDistributor 
shardDistributor, Journal journal)
     {
         this.supplier = supplier;
         this.shardDistributor = shardDistributor;
+
         this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, 
Topology.EMPTY);
+        this.journal = journal;
     }
 
-    public CommandStores(NodeCommandStoreService time, Agent agent, DataStore 
store, RandomSource random, ShardDistributor shardDistributor,
+    public CommandStores(NodeCommandStoreService time, Agent agent, DataStore 
store, RandomSource random, Journal journal, ShardDistributor shardDistributor,
                          ProgressLog.Factory progressLogFactory, 
LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory)
     {
-        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory), shardDistributor);
+        this(new StoreSupplier(time, agent, store, random, progressLogFactory, 
listenersFactory, shardFactory), shardDistributor, journal);
     }
 
     public Topology local()
@@ -693,7 +764,20 @@ public abstract class CommandStores
     public synchronized Supplier<EpochReady> updateTopology(Node node, 
Topology newTopology, boolean startSync)
     {
         TopologyUpdate update = updateTopology(node, current, newTopology, 
startSync);
-        current = update.snapshot;
+        if (update.snapshot != current)
+        {
+            AsyncResults.SettableResult<Void> flush = new 
AsyncResults.SettableResult<>();
+            journal.saveTopology(update.snapshot.asTopologyUpdate(), () -> 
flush.setSuccess(null));
+            current = update.snapshot;
+            return () -> {
+                EpochReady ready = update.bootstrap.get();
+                return new EpochReady(ready.epoch,
+                                      flush.flatMap(ignore -> 
ready.metadata).beginAsResult(),
+                                      flush.flatMap(ignore -> 
ready.coordinate).beginAsResult(),
+                                      flush.flatMap(ignore -> 
ready.data).beginAsResult(),
+                                      flush.flatMap(ignore -> 
ready.reads).beginAsResult());
+            };
+        }
         return update.bootstrap;
     }
 
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index ff3f55bd..254280e0 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -43,6 +43,7 @@ import accord.api.Agent;
 import accord.api.ConfigurationService;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.LocalConfig;
 import accord.api.LocalListeners;
 import accord.api.MessageSink;
@@ -184,7 +185,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
                 Function<Node, RemoteListeners> remoteListenersFactory, 
Function<Node, Timeouts> requestTimeoutsFactory, Function<Node, 
ProgressLog.Factory> progressLogFactory,
                 Function<Node, LocalListeners.Factory> localListenersFactory, 
CommandStores.Factory factory, CoordinationAdapter.Factory coordinationAdapters,
                 Persister<DurableBefore, DurableBefore> durableBeforePersister,
-                LocalConfig localConfig)
+                LocalConfig localConfig, Journal journal)
     {
         this.id = id;
         this.scheduler = scheduler; // we set scheduler first so that e.g. 
requestTimeoutsFactory and progressLogFactory can take references to it
@@ -201,7 +202,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
         this.agent = agent;
         this.random = random;
         this.persistDurableBefore = new PersistentField<>(() -> durableBefore, 
DurableBefore::merge, durableBeforePersister, this::setPersistedDurableBefore);
-        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
+        this.commandStores = factory.create(this, agent, dataSupplier.get(), 
random.fork(), journal, shardDistributor, progressLogFactory.apply(this), 
localListenersFactory.apply(this));
         this.durabilityScheduling = new DurabilityScheduling(this);
         // TODO (desired): make frequency configurable
         scheduler.recurring(() -> commandStores.forEachCommandStore(store -> 
store.progressLog.maybeNotify()), 1, SECONDS);
@@ -355,9 +356,9 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     }
 
     @Override
-    public void onRemoveNodes(long epoch, Collection<Id> removed)
+    public void onRemoveNode(long epoch, Id removed)
     {
-        topology.onRemoveNodes(epoch, removed);
+        topology.onRemoveNode(epoch, removed);
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 933280c4..6e66be24 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,7 +20,6 @@ package accord.topology;
 
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -554,14 +553,14 @@ public class TopologyManager
         epochs.syncComplete(node, epoch);
     }
 
-    public synchronized void onRemoveNodes(long removedIn, Collection<Id> 
removed)
+    // TODO (now, correctness): it seems to be wrong to count removed nodes 
towards sync quorums.
+    public synchronized void onRemoveNode(long removedIn, Id removed)
     {
         for (long epoch = removedIn, min = minEpoch(); epoch >= min; epoch--)
         {
             EpochState state = epochs.get(epoch);
             if (state == null || state.hasReachedQuorum()) continue;
-            for (Id node : removed)
-                epochs.syncComplete(node, epoch);
+            epochs.syncComplete(removed, epoch);
         }
     }
 
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index b2b04cae..7d188504 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -44,6 +44,7 @@ import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TestAgent;
+import accord.impl.basic.InMemoryJournal;
 import accord.impl.list.ListQuery;
 import accord.impl.list.ListRead;
 import accord.impl.list.ListUpdate;
@@ -199,7 +200,8 @@ public class Utils
                              InMemoryCommandStores.Synchronized::new,
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
-                             localConfig);
+                             localConfig,
+                             new InMemoryJournal(nodeId, agent));
         awaitUninterruptibly(node.unsafeStart());
         return node;
     }
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index bc5c4247..c8b65168 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -443,8 +443,7 @@ public class Cluster
                                                             () -> queue,
                                                             (id, onStale) -> 
globalExecutor.withAgent(agentSupplier.apply(onStale)),
                                                             
queue::checkFailures,
-                                                            ignore -> {
-                                                            },
+                                                            ignore -> {},
                                                             randomSupplier,
                                                             
timeServiceSupplier,
                                                             topologyFactory,
@@ -675,8 +674,8 @@ public class Cluster
                                      () -> new ListStore(scheduler, random, 
id), new ShardDistributor.EvenSplit<>(8, ignore -> new 
PrefixedIntHashKey.Splitter()),
                                      nodeExecutor.agent(),
                                      randomSupplier.get(), scheduler, 
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, 
DefaultTimeouts::new,
-                                     DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
cacheLoading, journal), new CoordinationAdapter.DefaultFactory(),
-                                     DurableBefore.NOOP_PERSISTER, 
localConfig);
+                                     DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, 
cacheLoading), new CoordinationAdapter.DefaultFactory(),
+                                     DurableBefore.NOOP_PERSISTER, 
localConfig, journal);
                 DurabilityScheduling durability = node.durabilityScheduling();
                 // TODO (desired): randomise
                 durability.setShardCycleTime(30, SECONDS);
@@ -729,36 +728,38 @@ public class Cluster
                 Id id = pickNodeNotBootstrapping(random, nodesList, nodeMap);
                 if (id == null)
                     return;
-                CommandStore[] stores = nodeMap.get(id).commandStores().all();
-
-                ((DelayedCommandStore)stores[0]).unsafeRunIn(() -> {
-                    Predicate<Pending> pred = getPendingPredicate(id.id, 
stores);
-                    while (sinks.drain(pred));
-
-                    // Journal cleanup is a rough equivalent of a node restart.
-                    trace.debug("Triggering journal cleanup for node " + id);
-                    CommandsForKey.disableLinearizabilityViolationsReporting();
-                    ListStore listStore = (ListStore) 
nodeMap.get(id).commandStores().dataStore();
-                    NavigableMap<RoutableKey, Timestamped<int[]>> prevData = 
listStore.copyOfCurrentData();
-                    listStore.clear();
-                    listStore.restoreFromSnapshot();
-                    // we are simulating node restart, so its remote listeners 
will also be gone
-                    
((DefaultRemoteListeners)nodeMap.get(id).remoteListeners()).clear();
-                    Int2ObjectHashMap<NavigableMap<TxnId, Command>> 
beforeStores = copyCommands(stores);
-                    for (CommandStore s : stores)
-                    {
-                        DelayedCommandStores.DelayedCommandStore store = 
(DelayedCommandStores.DelayedCommandStore) s;
-                        store.clearForTesting();
-                    }
-                    Journal journal = journalMap.get(id);
-                    journal.replay(nodeMap.get(id).commandStores());
-                    while (sinks.drain(pred));
-                    CommandsForKey.enableLinearizabilityViolationsReporting();
-                    verifyConsistentRestore(beforeStores, stores);
-                    // we can get ahead of prior state by executing further if 
we skip some earlier phase's dependencies
-                    listStore.checkAtLeast(stores, prevData);
-                    trace.debug("Done with cleanup.");
-                });
+
+                CommandStores stores = nodeMap.get(id).commandStores();
+                while (sinks.drain(getPendingPredicate(id.id, stores.all()))) ;
+
+                trace.debug("Triggering store cleanup and journal replay for 
node " + id);
+                CommandsForKey.disableLinearizabilityViolationsReporting();
+
+                // Clean data and restore from snapshot
+                ListStore listStore = (ListStore) 
nodeMap.get(id).commandStores().dataStore();
+                NavigableMap<RoutableKey, Timestamped<int[]>> prevData = 
listStore.copyOfCurrentData();
+                listStore.clear();
+                listStore.restoreFromSnapshot();
+
+                // We are simulating node restart, so its remote listeners 
will also be gone
+                ((DefaultRemoteListeners) 
nodeMap.get(id).remoteListeners()).clear();
+                Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = 
copyCommands(stores.all());
+
+                // Re-create all command stores
+                nodeMap.get(id).commandStores().restoreShardStateUnsafe(t -> 
{});
+                stores = nodeMap.get(id).commandStores();
+
+                // Replay journal
+                Journal journal = journalMap.get(id);
+                journal.replay(stores);
+
+                // Re-enable safety checks
+                while (sinks.drain(getPendingPredicate(id.id, stores.all()))) ;
+                CommandsForKey.enableLinearizabilityViolationsReporting();
+                verifyConsistentRestore(beforeStores, stores.all());
+                // we can get ahead of prior state by executing further if we 
skip some earlier phase's dependencies
+                listStore.checkAtLeast(stores, prevData);
+                trace.debug("Done with replay.");
             }, () -> random.nextInt(10, 30), SECONDS);
 
             durabilityScheduling.forEach(DurabilityScheduling::start);
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index e9b3f735..2e034aca 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -71,12 +71,12 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
 {
     private DelayedCommandStores(NodeCommandStoreService time, Agent agent, 
DataStore store, RandomSource random, ShardDistributor shardDistributor, 
ProgressLog.Factory progressLogFactory, LocalListeners.Factory 
listenersFactory, SimulatedDelayedExecutorService executorService, CacheLoading 
isLoadedCheck, Journal journal)
     {
-        super(time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal));
+        super(time, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory, 
DelayedCommandStore.factory(executorService, isLoadedCheck, journal));
     }
 
-    public static CommandStores.Factory factory(PendingQueue pending, 
CacheLoading isLoadedCheck, Journal journal)
+    public static CommandStores.Factory factory(PendingQueue pending, 
CacheLoading isLoadedCheck)
     {
-        return (time, agent, store, random, shardDistributor, 
progressLogFactory, listenersFactory) ->
+        return (time, agent, store, random, journal, shardDistributor, 
progressLogFactory, listenersFactory) ->
                new DelayedCommandStores(time, agent, store, random, 
shardDistributor, progressLogFactory, listenersFactory, new 
SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal);
     }
 
@@ -91,6 +91,42 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         return contains(previous, prefix);
     }
 
+    protected void loadSnapshot(Snapshot nextSnapshot)
+    {
+        Snapshot current = current();
+        // These checks are only applicable to delayed command stores.
+        for (Integer id : current.byId.keySet())
+        {
+            CommandStore prev = current.byId.get(id);
+            CommandStore next = nextSnapshot.byId.get(id);
+            {
+                RedundantBefore orig = prev.unsafeGetRedundantBefore();
+                RedundantBefore loaded = next.unsafeGetRedundantBefore();
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                NavigableMap<TxnId, Ranges> orig = 
prev.unsafeGetBootstrapBeganAt();
+                NavigableMap<TxnId, Ranges> loaded = 
next.unsafeGetBootstrapBeganAt();
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                NavigableMap<Timestamp, Ranges> orig = 
prev.unsafeGetSafeToRead();
+                NavigableMap<Timestamp, Ranges> loaded = 
next.unsafeGetSafeToRead();
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                RangesForEpoch orig = prev.unsafeGetRangesForEpoch();
+                RangesForEpoch loaded = next.unsafeGetRangesForEpoch();
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+        }
+
+        super.loadSnapshot(nextSnapshot);
+    }
+
     private static boolean contains(Topology previous, int searchPrefix)
     {
         for (Range range : previous.ranges())
@@ -139,38 +175,39 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             this.executor = executor;
             this.cacheLoading = cacheLoading;
             this.journal = journal;
+            restore();
         }
 
-        @Override
-        public void clearForTesting()
+        protected void loadRedundantBefore(RedundantBefore redundantBefore)
         {
-            super.clearForTesting();
-
-            // Rather than cleaning up and reloading, we can just assert 
equality during reload
-            {
-                RedundantBefore orig = unsafeGetRedundantBefore();
-                RedundantBefore loaded = journal.loadRedundantBefore(id());
-                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
-            }
+            if (redundantBefore != null)
+                unsafeSetRedundantBefore(redundantBefore);
+        }
 
-            {
-                NavigableMap<TxnId, Ranges> orig = unsafeGetBootstrapBeganAt();
-                NavigableMap<TxnId, Ranges> loaded = 
journal.loadBootstrapBeganAt(id());
-                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
-            }
+        protected void loadBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
bootstrapBeganAt)
+        {
+            if (bootstrapBeganAt != null)
+                unsafeSetBootstrapBeganAt(bootstrapBeganAt);
+        }
 
-            {
-                NavigableMap<Timestamp, Ranges> orig = unsafeGetSafeToRead();
-                NavigableMap<Timestamp, Ranges> loaded = 
journal.loadSafeToRead(id());
-                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
-            }
+        protected void loadSafeToRead(NavigableMap<Timestamp, Ranges> 
safeToRead)
+        {
+            if (safeToRead != null)
+                unsafeSetSafeToRead(safeToRead);
+        }
 
-            {
-                RangesForEpoch orig = unsafeGetRangesForEpoch();
-                RangesForEpoch loaded = journal.loadRangesForEpoch(id());
+        protected void loadRangesForEpoch(CommandStores.RangesForEpoch 
rangesForEpoch)
+        {
+            if (rangesForEpoch != null)
+                unsafeSetRangesForEpoch(rangesForEpoch);
+        }
 
-                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
-            }
+        public void restore()
+        {
+            loadRedundantBefore(journal.loadRedundantBefore(id()));
+            loadBootstrapBeganAt(journal.loadBootstrapBeganAt(id()));
+            loadSafeToRead(journal.loadSafeToRead(id()));
+            loadRangesForEpoch(journal.loadRangesForEpoch(id()));
         }
 
         @Override
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index 59f16268..bc2824e5 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -21,6 +21,7 @@ package accord.impl.basic;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.EnumMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -91,6 +92,7 @@ import static accord.utils.Invariants.illegalState;
 public class InMemoryJournal implements Journal
 {
     private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
+    private final List<TopologyUpdate> topologyUpdates = new ArrayList<>();
     private final Int2ObjectHashMap<FieldUpdates> fieldStates = new 
Int2ObjectHashMap<>();
 
     private final Node.Id id;
@@ -101,6 +103,7 @@ public class InMemoryJournal implements Journal
         this.id = id;
         this.agent = agent;
     }
+
     @Override
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
@@ -190,6 +193,44 @@ public class InMemoryJournal implements Journal
             onFlush.run();
     }
 
+    @Override
+    public Iterator<TopologyUpdate> replayTopologies()
+    {
+        return new Iterator<>()
+        {
+            int current = 0;
+            public boolean hasNext()
+            {
+                return current < topologyUpdates.size();
+            }
+
+            public TopologyUpdate next()
+            {
+                return topologyUpdates.get(current++);
+            }
+        };
+    }
+
+    @Override
+    public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
+    {
+        topologyUpdates.add(topologyUpdate);
+        if (onFlush != null)
+            onFlush.run();
+    }
+
+    public void truncateTopologiesForTesting(long minEpoch)
+    {
+        List<TopologyUpdate> next = new ArrayList<>();
+        for (int i = 0; i < topologyUpdates.size(); i++)
+        {
+            TopologyUpdate update = topologyUpdates.get(i);
+            if (update.global.epoch() >= minEpoch)
+                next.add(update);
+        }
+        topologyUpdates.retainAll(next);
+    }
+
     @Override
     public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index b40134a3..564fe00b 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -24,6 +24,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.Iterator;
 import java.util.NavigableMap;
 
 import accord.api.Journal;
@@ -90,6 +91,22 @@ public class LoggingJournal implements Journal
         delegate.saveCommand(store, update, onFlush);
     }
 
+    @Override
+    public Iterator<TopologyUpdate> replayTopologies()
+    {
+        log("REPLAY TOPOLOGIES\n");
+        return delegate.replayTopologies();
+    }
+
+    @Override
+    public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush)
+    {
+        log("%d: %s\n", topologyUpdate);
+        if (onFlush != null)
+            onFlush.run();
+        throw new IllegalArgumentException();
+    }
+
     @Override
     public void purge(CommandStores commandStores)
     {
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java 
b/accord-core/src/test/java/accord/impl/list/ListStore.java
index c4bfa4d4..7953ce48 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -46,6 +46,7 @@ import accord.coordinate.Truncated;
 import accord.coordinate.tracking.AllTracker;
 import accord.impl.basic.SimulatedFault;
 import accord.local.CommandStore;
+import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.SafeCommandStore;
 import accord.primitives.Range;
@@ -518,12 +519,12 @@ public class ListStore implements DataStore
         return new TreeMap<>(data);
     }
 
-    public void checkAtLeast(CommandStore[] commandStores, 
NavigableMap<RoutableKey, Timestamped<int[]>> a)
+    public void checkAtLeast(CommandStores commandStores, 
NavigableMap<RoutableKey, Timestamped<int[]>> a)
     {
         checkAtLeast(commandStores, a, data);
     }
 
-    public static void checkAtLeast(CommandStore[] commandStores, 
NavigableMap<RoutableKey, Timestamped<int[]>> a, NavigableMap<RoutableKey, 
Timestamped<int[]>> b)
+    public static void checkAtLeast(CommandStores commandStores, 
NavigableMap<RoutableKey, Timestamped<int[]>> a, NavigableMap<RoutableKey, 
Timestamped<int[]>> b)
     {
         if (a.isEmpty())
             return;
@@ -534,7 +535,7 @@ public class ListStore implements DataStore
             Timestamped<int[]> bv = b.get(k);
             if (bv == null || bv.timestamp.compareTo(av.timestamp) < 0)
             {
-                for (CommandStore commandStore : commandStores)
+                for (CommandStore commandStore : commandStores.all())
                 {
                     if 
(!commandStore.unsafeGetRangesForEpoch().allSince(av.timestamp.epoch()).contains(k))
                         continue;
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index 39c7b6de..a13c7deb 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -33,6 +33,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.NetworkFilter;
+import accord.api.Agent;
+import accord.api.Journal;
 import accord.api.MessageSink;
 import accord.api.LocalConfig;
 import accord.coordinate.CoordinationAdapter;
@@ -40,6 +42,7 @@ import accord.impl.DefaultTimeouts;
 import accord.impl.InMemoryCommandStores;
 import accord.impl.IntKey;
 import accord.impl.DefaultLocalListeners;
+import accord.impl.basic.InMemoryJournal;
 import accord.impl.progresslog.DefaultProgressLogs;
 import accord.impl.DefaultRemoteListeners;
 import accord.impl.SizeOfIntersectionSorter;
@@ -130,13 +133,15 @@ public class MockCluster implements Network, 
AutoCloseable, Iterable<Node>
         MessageSink messageSink = messageSinkFactory.apply(id, this);
         MockConfigurationService configurationService = new 
MockConfigurationService(messageSink, onFetchTopology, topology);
         LocalConfig localConfig = LocalConfig.DEFAULT;
+        Agent agent = new TestAgent();
+        Journal journal = new InMemoryJournal(id, agent);
         Node node = new Node(id,
                              messageSink,
                              configurationService,
                              time,
                              () -> store,
                              new ShardDistributor.EvenSplit(8, ignore -> new 
IntKey.Splitter()),
-                             new TestAgent(),
+                             agent,
                              random.fork(),
                              new ThreadPoolScheduler(),
                              SizeOfIntersectionSorter.SUPPLIER,
@@ -147,7 +152,8 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                              InMemoryCommandStores.SingleThread::new,
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
-                             localConfig);
+                             localConfig,
+                             journal);
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(topology, false, true);
         return node;
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 58cb6cbb..a9746ed2 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import accord.api.Agent;
 import accord.api.LocalConfig;
 import accord.api.ProgressLog.NoOpProgressLog;
 import accord.api.RoutingKey;
@@ -43,6 +44,7 @@ import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.TestAgent;
 import accord.impl.TestAgent.RethrowAgent;
 import accord.impl.TopologyFactory;
+import accord.impl.basic.InMemoryJournal;
 import accord.impl.mock.MockCluster;
 import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
@@ -101,14 +103,14 @@ public class ImmutableCommandTest
     {
         MockCluster.Clock clock = new MockCluster.Clock(100);
         LocalConfig localConfig = LocalConfig.DEFAULT;
-        Node node = new Node(id, null, new MockConfigurationService(null, 
(epoch, service) -> { }, storeSupport.local.get()),
-                             clock,
-                             () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
+        Agent agent = new TestAgent();
+        Node node = new Node(id, null, new MockConfigurationService(null, 
(epoch, service) -> { }, storeSupport.local.get()), clock,
+                             () -> storeSupport.data, new 
ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), agent, new 
DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED,
                              SizeOfIntersectionSorter.SUPPLIER, 
DefaultRemoteListeners::new, DefaultTimeouts::new, ignore -> ignore2 -> new 
NoOpProgressLog(), DefaultLocalListeners.Factory::new,
                              InMemoryCommandStores.Synchronized::new,
                              new CoordinationAdapter.DefaultFactory(),
                              DurableBefore.NOOP_PERSISTER,
-                             localConfig);
+                             localConfig, new InMemoryJournal(id, agent));
         awaitUninterruptibly(node.unsafeStart());
         node.onTopologyUpdate(storeSupport.local.get(), false, true);
         return node;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
index 8fb34489..210d0ae3 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java
@@ -29,9 +29,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -39,6 +41,7 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.api.Journal;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.api.LocalConfig;
@@ -50,9 +53,12 @@ import accord.impl.progresslog.DefaultProgressLogs;
 import accord.impl.DefaultRemoteListeners;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.local.AgentExecutor;
+import accord.local.Command;
+import accord.local.CommandStores;
 import accord.local.DurableBefore;
 import accord.local.Node;
 import accord.local.Node.Id;
+import accord.local.RedundantBefore;
 import accord.local.ShardDistributor;
 import accord.local.TimeService;
 import accord.messages.Callback;
@@ -61,7 +67,11 @@ import accord.messages.Reply.FailureReply;
 import accord.messages.ReplyContext;
 import accord.messages.Request;
 import accord.messages.SafeCallback;
+import accord.primitives.Ranges;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
 import accord.topology.Topology;
+import accord.utils.PersistentField;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResult;
@@ -347,7 +357,8 @@ public class Cluster implements Scheduler
                                           MaelstromAgent.INSTANCE,
                                           randomSupplier.get(), sinks, 
SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, 
DefaultTimeouts::new,
                                           DefaultProgressLogs::new, 
DefaultLocalListeners.Factory::new, InMemoryCommandStores.SingleThread::new, 
new CoordinationAdapter.DefaultFactory(),
-                                          DurableBefore.NOOP_PERSISTER, 
localConfig));
+                                          DurableBefore.NOOP_PERSISTER, 
localConfig,
+                                          new NoOpJournal()));
             }
 
             AsyncResult<?> startup = 
AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()),
 (a, b) -> null).beginAsResult();
@@ -373,4 +384,21 @@ public class Cluster implements Scheduler
             lookup.values().forEach(Node::shutdown);
         }
     }
-}
+
+    public static class NoOpJournal implements Journal
+    {
+        @Override public Command loadCommand(int store, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new 
IllegalStateException("Not impelemented"); }
+        @Override public Command.Minimal loadMinimal(int store, TxnId txnId, 
Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { 
throw new IllegalStateException("Not impelemented"); }
+        @Override public void saveCommand(int store, CommandUpdate value, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
+        @Override public Iterator<TopologyUpdate> replayTopologies() { throw 
new IllegalStateException("Not impelemented"); }
+        @Override public void saveTopology(TopologyUpdate topologyUpdate, 
Runnable onFlush)  { throw new IllegalStateException("Not impelemented"); }
+        @Override public void purge(CommandStores commandStores)  { throw new 
IllegalStateException("Not impelemented"); }
+        @Override public void replay(CommandStores commandStores)  { throw new 
IllegalStateException("Not impelemented"); }
+        @Override public RedundantBefore loadRedundantBefore(int store) { 
throw new IllegalStateException("Not impelemented"); }
+        @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int 
store) { throw new IllegalStateException("Not impelemented"); }
+        @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int 
store) { throw new IllegalStateException("Not impelemented"); }
+        @Override public CommandStores.RangesForEpoch loadRangesForEpoch(int 
store) { throw new IllegalStateException("Not impelemented"); }
+        @Override public PersistentField.Persister<DurableBefore, 
DurableBefore> durableBeforePersister() { throw new IllegalStateException("Not 
impelemented"); }
+        @Override public void saveStoreState(int store, FieldUpdates 
fieldUpdates, Runnable onFlush)  { throw new IllegalStateException("Not 
impelemented"); }
+    }
+}
\ No newline at end of file
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java 
b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
index 306313e1..122d1a4d 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import accord.api.Journal;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
 import accord.api.LocalConfig;
@@ -179,13 +180,14 @@ public class Main
             topology = topologyFactory.toTopology(init.cluster);
             sink = new StdoutSink(System::currentTimeMillis, scheduler, start, 
init.self, out, err);
             LocalConfig localConfig = LocalConfig.DEFAULT;
+            Journal journal = new Cluster.NoOpJournal();
             on = new Node(init.self, sink, new SimpleConfigService(topology),
                           
TimeService.ofNonMonotonic(System::currentTimeMillis, TimeUnit.MILLISECONDS),
                           MaelstromStore::new, new 
ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()),
                           MaelstromAgent.INSTANCE, new DefaultRandom(), 
scheduler, SizeOfIntersectionSorter.SUPPLIER,
                           DefaultRemoteListeners::new, DefaultTimeouts::new, 
DefaultProgressLogs::new, DefaultLocalListeners.Factory::new,
                           InMemoryCommandStores.SingleThread::new, new 
CoordinationAdapter.DefaultFactory(),
-                          DurableBefore.NOOP_PERSISTER, localConfig);
+                          DurableBefore.NOOP_PERSISTER, localConfig, journal);
             awaitUninterruptibly(on.unsafeStart());
             err.println("Initialized node " + init.self);
             err.flush();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to