This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit c55e251dbbeffa35c85aa2d9c1605ff93ac7a340
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Nov 26 15:25:32 2024 +0100

    Implement field saving/loading in AccordJournal
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20114
---
 accord-core/src/main/java/accord/api/Journal.java  |   4 +-
 .../java/accord/impl/AbstractSafeCommandStore.java | 126 ++++++++++++++++++++-
 .../java/accord/impl/InMemoryCommandStore.java     |  79 +++++--------
 .../src/main/java/accord/local/CommandStore.java   |  11 +-
 .../src/main/java/accord/local/CommandStores.java  |  65 +++++------
 .../src/test/java/accord/impl/basic/Cluster.java   |  10 +-
 .../accord/impl/basic/DelayedCommandStores.java    |  41 +++++++
 .../java/accord/impl/basic/InMemoryJournal.java    |  49 ++++----
 .../java/accord/impl/basic/LoggingJournal.java     |   2 +-
 .../java/accord/impl/basic/VerifyingJournal.java   |  25 +++-
 .../accord/impl/list/ListFetchCoordinator.java     |   2 +-
 11 files changed, 284 insertions(+), 130 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/Journal.java 
b/accord-core/src/main/java/accord/api/Journal.java
index 6bd18bd2..300fc274 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -45,7 +45,7 @@ public interface Journal
     RedundantBefore loadRedundantBefore(int commandStoreId);
     NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId);
     NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId);
-    CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId);
+    CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId);
 
     void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush);
 
@@ -69,7 +69,7 @@ public interface Journal
         public RedundantBefore newRedundantBefore;
         public NavigableMap<TxnId, Ranges> newBootstrapBeganAt;
         public NavigableMap<Timestamp, Ranges> newSafeToRead;
-        public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch;
+        public CommandStores.RangesForEpoch newRangesForEpoch;
 
         public String toString()
         {
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java 
b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
index bc053ad7..cd383980 100644
--- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java
@@ -20,12 +20,25 @@ package accord.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NavigableMap;
 
 import accord.api.RoutingKey;
-import accord.local.*;
+import accord.local.CommandStore;
+import accord.local.KeyHistory;
+import accord.local.PreLoadContext;
+import accord.local.RedundantBefore;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
 import accord.local.cfk.SafeCommandsForKey;
-import accord.primitives.*;
-
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.RoutingKeys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
+
+import static accord.api.Journal.FieldUpdates;
+import static accord.local.CommandStores.RangesForEpoch;
 import static accord.local.KeyHistory.TIMESTAMPS;
 import static accord.utils.Invariants.illegalArgument;
 
@@ -37,9 +50,19 @@ extends SafeCommandStore
 {
     protected final PreLoadContext context;
 
-    public AbstractSafeCommandStore(PreLoadContext context)
+    private final CommandStore commandStore;
+    private FieldUpdates fieldUpdates;
+
+    protected AbstractSafeCommandStore(PreLoadContext context, CommandStore 
commandStore)
     {
         this.context = context;
+        this.commandStore = commandStore;
+    }
+
+    @Override
+    public CommandStore commandStore()
+    {
+        return commandStore;
     }
 
     public interface CommandStoreCaches<C, TFK, CFK> extends AutoCloseable
@@ -186,5 +209,100 @@ extends SafeCommandStore
 
     public void postExecute()
     {
+        if (fieldUpdates == null)
+            return;
+
+        if (fieldUpdates.newRedundantBefore != null)
+            super.unsafeSetRedundantBefore(fieldUpdates.newRedundantBefore);
+
+        if (fieldUpdates.newBootstrapBeganAt != null)
+            super.setBootstrapBeganAt(fieldUpdates.newBootstrapBeganAt);
+
+        if (fieldUpdates.newSafeToRead != null)
+            super.setSafeToRead(fieldUpdates.newSafeToRead);
+
+        if (fieldUpdates.newRangesForEpoch != null)
+            super.setRangesForEpoch(fieldUpdates.newRangesForEpoch);
+    }
+
+    /**
+     * Persistent field update logic
+     */
+
+    @Override
+    public final void upsertRedundantBefore(RedundantBefore addRedundantBefore)
+    {
+        // TODO (required): fix RedundantBefore sorting issue and switch to 
upsert mode
+        ensureFieldUpdates().newRedundantBefore = 
RedundantBefore.merge(redundantBefore(), addRedundantBefore);
+        unsafeUpsertRedundantBefore(addRedundantBefore);
+    }
+
+    @Override
+    public final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> 
newBootstrapBeganAt)
+    {
+        ensureFieldUpdates().newBootstrapBeganAt = newBootstrapBeganAt;
+    }
+
+    @Override
+    public final void setSafeToRead(NavigableMap<Timestamp, Ranges> 
newSafeToRead)
+    {
+        ensureFieldUpdates().newSafeToRead = newSafeToRead;
+    }
+
+    @Override
+    public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
+    {
+        if (rangesForEpoch != null)
+        {
+            super.setRangesForEpoch(rangesForEpoch);
+            ensureFieldUpdates().newRangesForEpoch = rangesForEpoch;
+        }
+    }
+
+    @Override
+    public RangesForEpoch ranges()
+    {
+        if (fieldUpdates != null && fieldUpdates.newRangesForEpoch != null)
+            return fieldUpdates.newRangesForEpoch;
+
+        return null;
+    }
+
+    @Override
+    public NavigableMap<TxnId, Ranges> bootstrapBeganAt()
+    {
+        if (fieldUpdates != null && fieldUpdates.newBootstrapBeganAt != null)
+            return fieldUpdates.newBootstrapBeganAt;
+
+        return super.bootstrapBeganAt();
+    }
+
+    @Override
+    public NavigableMap<Timestamp, Ranges> safeToReadAt()
+    {
+        if (fieldUpdates != null && fieldUpdates.newSafeToRead != null)
+            return fieldUpdates.newSafeToRead;
+
+        return super.safeToReadAt();
+    }
+
+    @Override
+    public RedundantBefore redundantBefore()
+    {
+        if (fieldUpdates != null && fieldUpdates.newRedundantBefore != null)
+            return fieldUpdates.newRedundantBefore;
+
+        return super.redundantBefore();
+    }
+
+    private FieldUpdates ensureFieldUpdates()
+    {
+        if (fieldUpdates == null) fieldUpdates = new FieldUpdates();
+        return fieldUpdates;
+    }
+
+    public FieldUpdates fieldUpdates()
+    {
+        return fieldUpdates;
     }
 }
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 9bf05302..b14767e4 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -61,7 +61,6 @@ import accord.local.KeyHistory;
 import accord.local.Node;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
-import accord.local.RedundantBefore;
 import accord.local.RedundantStatus;
 import accord.local.RejectBefore;
 import accord.local.SafeCommand;
@@ -395,7 +394,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 if (!command.hasBeen(PreCommitted)) continue;
                 if (!command.txnId().isVisible()) continue;
 
-                Ranges allRanges = 
unsafeRangesForEpoch().allBetween(txnId.epoch(), 
command.executeAtOrTxnId().epoch());
+                Ranges allRanges = 
unsafeGetRangesForEpoch().allBetween(txnId.epoch(), 
command.executeAtOrTxnId().epoch());
                 boolean done = command.hasBeen(Truncated);
                 if (!done)
                 {
@@ -699,11 +698,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
     public static class InMemorySafeStore extends 
AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeTimestampsForKey, 
InMemorySafeCommandsForKey, InMemoryCommandStoreCaches>
     {
-        private final InMemoryCommandStore commandStore;
         protected final Map<TxnId, InMemorySafeCommand> commands;
         private final Map<RoutableKey, InMemorySafeTimestampsForKey> 
timestampsForKey;
         private final Map<RoutableKey, InMemorySafeCommandsForKey> 
commandsForKey;
-        private RangesForEpoch ranges;
 
         public InMemorySafeStore(InMemoryCommandStore commandStore,
                                  RangesForEpoch ranges,
@@ -712,12 +709,12 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                                  Map<RoutableKey, 
InMemorySafeTimestampsForKey> timestampsForKey,
                                  Map<RoutableKey, InMemorySafeCommandsForKey> 
commandsForKey)
         {
-            super(context);
-            this.commandStore = commandStore;
+            super(context, commandStore);
+
+            super.setRangesForEpoch(ranges);
             this.commands = commands;
             this.commandsForKey = commandsForKey;
             this.timestampsForKey = timestampsForKey;
-            this.ranges = ranges;
             for (InMemorySafeCommand cmd : commands.values())
             {
                 if (cmd.isUnset()) cmd.uninitialised();
@@ -738,11 +735,17 @@ public abstract class InMemoryCommandStore extends 
CommandStore
             return commands.get(txnId);
         }
 
+        @Override
+        public InMemoryCommandStore commandStore()
+        {
+            return (InMemoryCommandStore) super.commandStore();
+        }
+
         @Override
         protected InMemoryCommandStoreCaches tryGetCaches()
         {
-            if (commandStore.canExposeUnloaded())
-                return commandStore.new InMemoryCommandStoreCaches();
+            if (commandStore().canExposeUnloaded())
+                return commandStore().new InMemoryCommandStoreCaches();
             return null;
         }
 
@@ -796,58 +799,33 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 return;
 
             Ranges slice = ranges(txnId, updated.executeAtOrTxnId());
-            slice = 
commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId, 
updated.executeAtOrTxnId(), slice);
-            commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new 
RangeCommand(commandStore.commands.get(txnId)))
+            slice = 
commandStore().unsafeGetRedundantBefore().removeShardRedundant(txnId, 
updated.executeAtOrTxnId(), slice);
+            commandStore().rangeCommands.computeIfAbsent(txnId, ignore -> new 
RangeCommand(commandStore().commands.get(txnId)))
                          
.update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice,
 Minimal));
         }
 
-        @Override
-        public CommandStore commandStore()
-        {
-            return commandStore;
-        }
-
         @Override
         public DataStore dataStore()
         {
-            return commandStore.store;
+            return commandStore().store;
         }
 
         @Override
         public Agent agent()
         {
-            return commandStore.agent;
+            return commandStore().agent;
         }
 
         @Override
         public ProgressLog progressLog()
         {
-            return commandStore.progressLog;
-        }
-
-        @Override
-        public RangesForEpoch ranges()
-        {
-            return ranges;
-        }
-
-        @Override
-        public void setRangesForEpoch(RangesForEpoch rangesForEpoch)
-        {
-            super.setRangesForEpoch(rangesForEpoch);
-            ranges = rangesForEpoch;
-        }
-
-        @Override
-        public void upsertRedundantBefore(RedundantBefore addRedundantBefore)
-        {
-            unsafeUpsertRedundantBefore(addRedundantBefore);
+            return commandStore().progressLog;
         }
 
         @Override
         public NodeCommandStoreService node()
         {
-            return commandStore.node;
+            return commandStore().node;
         }
 
         private static class TxnInfo
@@ -878,7 +856,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         @Override
         public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, 
T accumulate)
         {
-            accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
+            accumulate = commandStore().mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
                 return commands.mapReduceActive(keysOrRanges, startedBefore, 
testKind, map, p1, prev);
             }, accumulate);
 
@@ -889,7 +867,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         @Override
         public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, 
TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate)
         {
-            accumulate = commandStore.mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
+            accumulate = commandStore().mapReduceForKey(this, keysOrRanges, 
(commands, prev) -> {
                 return commands.mapReduceFull(keysOrRanges, testTxnId, 
testKind, testStartedAt, testDep, testStatus, map, p1, prev);
             }, accumulate);
 
@@ -900,7 +878,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         {
             // TODO (find lib, efficiency): this is super inefficient, need to 
store Command in something queryable
             Map<Range, List<TxnInfo>> collect = new TreeMap<>(Range::compare);
-            commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> {
+            commandStore().rangeCommands.forEach(((txnId, rangeCommand) -> {
                 Command command = rangeCommand.command.value();
                 // TODO (now): probably this isn't safe - want to ensure we 
take dependency on any relevant syncId
                 if (command.saveStatus().compareTo(SaveStatus.Erased) >= 0)
@@ -979,7 +957,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
             if (testStatus == ANY_STATUS && testDep == ANY_DEPS)
             {
-                commandStore.historicalRangeCommands.forEach(((txnId, ranges) 
-> {
+                commandStore().historicalRangeCommands.forEach(((txnId, 
ranges) -> {
                     switch (testStartedAt)
                     {
                         default: throw new AssertionError();
@@ -1003,7 +981,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                         List<TxnInfo> list = in.computeIfAbsent(r, ignore -> 
new ArrayList<>());
                         if (list.isEmpty() || !list.get(list.size() - 
1).txnId.equals(txnId))
                         {
-                            GlobalCommand global = 
commandStore.commands.get(txnId);
+                            GlobalCommand global = 
commandStore().commands.get(txnId);
                             if (global != null && global.value() != null)
                             {
                                 Command command = global.value();
@@ -1035,6 +1013,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
 
         public void postExecute()
         {
+            super.postExecute();
             commands.values().forEach(c -> {
                 if (c == null || c.current() == null)
                     return;
@@ -1042,24 +1021,24 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                 Timestamp executeAt = c.current().executeAtIfKnown();
                 if (executeAt != null)
                 {
-                    if (c.current().hasBeen(Truncated)) 
commandStore.commandsByExecuteAt.remove(executeAt);
-                    else commandStore.commandsByExecuteAt.put(executeAt, 
commandStore.command(c.txnId()));
+                    if (c.current().hasBeen(Truncated)) 
commandStore().commandsByExecuteAt.remove(executeAt);
+                    else commandStore().commandsByExecuteAt.put(executeAt, 
commandStore().command(c.txnId()));
                 }
 
                 if (c.isUnset() || c.current().saveStatus().isUninitialised())
-                    commandStore.commands.remove(c.txnId());
+                    commandStore().commands.remove(c.txnId());
 
                 c.invalidate();
             });
 
             timestampsForKey.values().forEach(tfk -> {
                 if (tfk.isUnset())
-                    commandStore.timestampsForKey.remove(tfk.key());
+                    commandStore().timestampsForKey.remove(tfk.key());
                 tfk.invalidate();
             });
             commandsForKey.values().forEach(cfk -> {
                 if (cfk.isUnset())
-                    commandStore.commandsForKey.remove(cfk.key());
+                    commandStore().commandsForKey.remove(cfk.key());
                 cfk.invalidate();
             });
         }
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index f7eb397b..c38ac501 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -144,7 +144,7 @@ public abstract class CommandStore implements AgentExecutor
     protected final LocalListeners listeners;
     protected final EpochUpdateHolder epochUpdateHolder;
 
-    // Used in markShardStale to make sure the staleness includes in progresss 
bootstraps
+    // Used in markShardStale to make sure the staleness includes in progress 
bootstraps
     private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once 
inserted, rolled-over until invalidated, and the floor entry contains additions)
     private RedundantBefore redundantBefore = RedundantBefore.EMPTY;
     private MaxConflicts maxConflicts = MaxConflicts.EMPTY;
@@ -242,7 +242,7 @@ public abstract class CommandStore implements AgentExecutor
             unsafeSetRangesForEpoch(update.newRangesForEpoch);
     }
 
-    public RangesForEpoch unsafeRangesForEpoch()
+    public RangesForEpoch unsafeGetRangesForEpoch()
     {
         return rangesForEpoch;
     }
@@ -360,6 +360,7 @@ public abstract class CommandStore implements AgentExecutor
         // TODO (desired): narrow ranges to those that are owned
         Invariants.checkArgument(txnId.is(ExclusiveSyncPoint));
         RedundantBefore newRedundantBefore = 
RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, 
txnId, TxnId.NONE, TxnId.NONE, TxnId.NONE));
+        safeStore.upsertRedundantBefore(newRedundantBefore);
         unsafeSetRedundantBefore(newRedundantBefore);
         updatedRedundantBefore(safeStore, txnId, ranges);
     }
@@ -647,15 +648,11 @@ public abstract class CommandStore implements 
AgentExecutor
         return () -> {
             AsyncResult<Void> done = execute(empty(), (safeStore) -> {
                 // Merge in a base for any ranges that needs to be covered
-                // TODO (review): Convoluted check to not overwrite existing 
bootstraps with TxnId.NONE
-                // If loading from disk didn't finish before this then we 
might initialize the range at TxnId.NONE?
-                // Does CommandStores.topology ensure that doesn't happen? Is 
it fine if it does because it will get superseded?
-
                 Ranges newBootstrapRanges = ranges;
                 for (Ranges existing : bootstrapBeganAt.values())
                     newBootstrapRanges = newBootstrapRanges.without(existing);
                 if (!newBootstrapRanges.isEmpty())
-                    bootstrapBeganAt = bootstrap(TxnId.NONE, 
newBootstrapRanges, bootstrapBeganAt);
+                    safeStore.setBootstrapBeganAt(bootstrap(TxnId.NONE, 
newBootstrapRanges, bootstrapBeganAt));
                 safeStore.setSafeToRead(purgeAndInsert(safeToRead, TxnId.NONE, 
ranges));
             }).beginAsResult();
 
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java 
b/accord-core/src/main/java/accord/local/CommandStores.java
index 9068f19c..60a94a7c 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -142,54 +143,46 @@ public abstract class CommandStores
     // We ONLY remove ranges to keep logic manageable; likely to only merge 
CommandStores into a new CommandStore via some kind of Bootstrap
     public static class RangesForEpoch
     {
-        public static class Snapshot
-        {
-            public final long[] epochs;
-            public final Ranges[] ranges;
-
-            public Snapshot(long[] epochs, Ranges[] ranges)
-            {
-                this.epochs = epochs;
-                this.ranges = ranges;
-            }
-
-            @Override
-            public boolean equals(Object o)
-            {
-                if (this == o) return true;
-                if (o == null || getClass() != o.getClass()) return false;
-                Snapshot snapshot = (Snapshot) o;
-                return Objects.deepEquals(epochs, snapshot.epochs) && 
Objects.deepEquals(ranges, snapshot.ranges);
-            }
-
-            @Override
-            public int hashCode()
-            {
-                return Objects.hash(Arrays.hashCode(epochs), 
Arrays.hashCode(ranges));
-            }
-        }
-
         final long[] epochs;
         final Ranges[] ranges;
-        final CommandStore store;
 
-        public RangesForEpoch(long epoch, Ranges ranges, CommandStore store)
+        public RangesForEpoch(long epoch, Ranges ranges)
         {
             this.epochs = new long[] { epoch };
             this.ranges = new Ranges[] { ranges };
-            this.store = store;
         }
 
-        public RangesForEpoch(long[] epochs, Ranges[] ranges, CommandStore 
store)
+        public RangesForEpoch(long[] epochs, Ranges[] ranges)
         {
+            Invariants.checkState(epochs.length == ranges.length);
             this.epochs = epochs;
             this.ranges = ranges;
-            this.store = store;
         }
 
-        public Snapshot snapshot()
+        public int size()
+        {
+            return epochs.length;
+        }
+
+        public void forEach(BiConsumer<Long, Ranges> forEach)
+        {
+            for (int i = 0; i < epochs.length; i++)
+                forEach.accept(epochs[i], ranges[i]);
+        }
+
+        @Override
+        public boolean equals(Object object)
+        {
+            if (this == object) return true;
+            if (object == null || getClass() != object.getClass()) return 
false;
+            RangesForEpoch that = (RangesForEpoch) object;
+            return Objects.deepEquals(epochs, that.epochs) && 
Objects.deepEquals(ranges, that.ranges);
+        }
+
+        @Override
+        public int hashCode()
         {
-            return new Snapshot(epochs, ranges);
+            throw new UnsupportedOperationException();
         }
 
         public RangesForEpoch withRanges(long epoch, Ranges latestRanges)
@@ -201,7 +194,7 @@ public abstract class CommandStores
             newEpochs[newLength - 1] = epoch;
             newRanges[newLength - 1] = latestRanges;
             Invariants.checkState(newEpochs[newLength - 1] == 0 || 
newEpochs[newLength - 1] == epoch, "Attempted to override historic epoch %d 
with %d", newEpochs[newLength - 1], epoch);
-            return new RangesForEpoch(newEpochs, newRanges, store);
+            return new RangesForEpoch(newEpochs, newRanges);
         }
 
         public @Nonnull Ranges coordinates(TxnId txnId)
@@ -463,7 +456,7 @@ public abstract class CommandStores
             {
                 EpochUpdateHolder updateHolder = new EpochUpdateHolder();
                 ShardHolder shard = new ShardHolder(supplier.create(nextId++, 
updateHolder));
-                shard.ranges = new RangesForEpoch(epoch, addRanges, 
shard.store);
+                shard.ranges = new RangesForEpoch(epoch, addRanges);
                 shard.store.epochUpdateHolder.add(epoch, shard.ranges, 
addRanges);
                 
shard.store.epochUpdateHolder.updateGlobal(newTopology.ranges());
 
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java 
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 93b792e0..da3914f6 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -779,13 +779,15 @@ public class Cluster
                     && beforeCommand.promised().equals(afterCommand.promised())
                     && 
beforeCommand.durability().equals(afterCommand.durability()));
             }
+
             if (before.size() > store.unsafeCommands().size())
             {
-                for (TxnId txnId : before.keySet())
+                for (Map.Entry<TxnId, Command> entry : before.entrySet())
                 {
+                    TxnId txnId = entry.getKey();
                     if (!store.unsafeCommands().containsKey(txnId))
                     {
-                        Command beforeCommand = before.get(txnId);
+                        Command beforeCommand = entry.getValue();
                         if (beforeCommand.saveStatus() == SaveStatus.Erased)
                             continue;
 
@@ -1123,7 +1125,7 @@ public class Cluster
         {
             return txnId + ":" + command.saveStatus() + "@"
                    + commandStore.toString().replaceAll("DelayedCommandStore", 
"")
-                   + (command.homeKey() != null && 
commandStore.unsafeRangesForEpoch().allAt(txnId.epoch()).contains(command.homeKey())
 ? "(Home)" : "");
+                   + (command.homeKey() != null && 
commandStore.unsafeGetRangesForEpoch().allAt(txnId.epoch()).contains(command.homeKey())
 ? "(Home)" : "");
         }
     }
 
@@ -1270,7 +1272,7 @@ public class Cluster
         if (participants == null)
             return false;
 
-        return 
participants.owns().intersects(commandStore.unsafeRangesForEpoch().allBetween(command.txnId().epoch(),
 command.executeAtIfKnownElseTxnId()));
+        return 
participants.owns().intersects(commandStore.unsafeGetRangesForEpoch().allBetween(command.txnId().epoch(),
 command.executeAtIfKnownElseTxnId()));
     }
 
     private BlockingTransaction toBlocking(Command command, 
DelayedCommandStore store)
diff --git 
a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java 
b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
index 17afe245..1d209bb9 100644
--- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
+++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Queue;
 import java.util.concurrent.Callable;
 import java.util.function.BiConsumer;
@@ -49,10 +50,13 @@ import accord.local.CommandStores;
 import accord.local.Node;
 import accord.local.NodeCommandStoreService;
 import accord.local.PreLoadContext;
+import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
 import accord.local.ShardDistributor;
 import accord.primitives.Range;
+import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
+import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.topology.Topology;
@@ -143,6 +147,38 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
             this.journal = journal;
         }
 
+        @Override
+        public void clearForTesting()
+        {
+            super.clearForTesting();
+
+            // Rather than cleaning up and reloading, we can just assert 
equality during reload
+            {
+                RedundantBefore orig = unsafeGetRedundantBefore();
+                RedundantBefore loaded = journal.loadRedundantBefore(id());
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                NavigableMap<TxnId, Ranges> orig = unsafeGetBootstrapBeganAt();
+                NavigableMap<TxnId, Ranges> loaded = 
journal.loadBootstrapBeganAt(id());
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                NavigableMap<Timestamp, Ranges> orig = unsafeGetSafeToRead();
+                NavigableMap<Timestamp, Ranges> loaded = 
journal.loadSafeToRead(id());
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+
+            {
+                RangesForEpoch orig = unsafeGetRangesForEpoch();
+                RangesForEpoch loaded = journal.loadRangesForEpoch(id());
+
+                Invariants.checkState(orig.equals(loaded), "%s should equal 
%s", loaded, orig);
+            }
+        }
+
         @Override
         public void validateRead(Command current)
         {
@@ -262,6 +298,7 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
     {
         private final DelayedCommandStore commandStore;
         private final CacheLoadingChance cacheLoadingChance;
+
         public DelayedSafeStore(DelayedCommandStore commandStore,
                                 RangesForEpoch ranges,
                                 PreLoadContext context,
@@ -278,6 +315,10 @@ public class DelayedCommandStores extends 
InMemoryCommandStores.SingleThread
         @Override
         public void postExecute()
         {
+            Journal.FieldUpdates fieldUpdates = fieldUpdates();
+            if (fieldUpdates != null)
+                commandStore.journal.saveStoreState(commandStore.id(), 
fieldUpdates, () -> {});
+
             commands.entrySet().forEach(e -> {
                 InMemorySafeCommand safe = e.getValue();
                 if (!safe.isModified()) return;
diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java 
b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
index d48cbb1b..dc676fed 100644
--- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java
@@ -65,22 +65,13 @@ import static accord.utils.Invariants.illegalState;
 public class InMemoryJournal implements Journal
 {
     private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> 
diffsPerCommandStore = new Int2ObjectHashMap<>();
-    private final FieldStates fieldStates;
-
-    private static class FieldStates
-    {
-        RedundantBefore redundantBefore = RedundantBefore.EMPTY;
-        NavigableMap<TxnId, Ranges> bootstrapBeganAt = 
ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY);
-        NavigableMap<Timestamp, Ranges> safeToRead = 
ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY);
-        CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null;
-    }
+    private final Int2ObjectHashMap<FieldUpdates> fieldStates = new 
Int2ObjectHashMap<>();
 
     private final Node.Id id;
 
     public InMemoryJournal(Node.Id id)
     {
         this.id = id;
-        this.fieldStates = new FieldStates();
     }
 
     @Override
@@ -120,37 +111,57 @@ public class InMemoryJournal implements Journal
     @Override
     public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
-        return fieldStates.redundantBefore;
+        FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+        if (fieldStates == null)
+            return null;
+        return fieldStates.newRedundantBefore;
     }
 
     @Override
     public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
-        return fieldStates.bootstrapBeganAt;
+        FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+        if (fieldStates == null)
+            return null;
+        return fieldStates.newBootstrapBeganAt;
     }
 
     @Override
     public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
     {
-        return fieldStates.safeToRead;
+        FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+        if (fieldStates == null)
+            return null;
+        return fieldStates.newSafeToRead;
     }
 
     @Override
-    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
-        return fieldStates.rangesForEpoch;
+        FieldUpdates fieldStates = this.fieldStates.get(commandStoreId);
+        if (fieldStates == null)
+            return null;
+        return fieldStates.newRangesForEpoch;
     }
 
     public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
     {
+
+        FieldUpdates fieldStates = this.fieldStates.computeIfAbsent(store, s 
-> {
+            FieldUpdates init = new FieldUpdates();
+            init.newRedundantBefore = RedundantBefore.EMPTY;
+            init.newBootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, 
Ranges.EMPTY);
+            init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE, 
Ranges.EMPTY);
+            return init;
+        });
         if (fieldUpdates.newRedundantBefore != null)
-            fieldStates.redundantBefore = fieldUpdates.newRedundantBefore;
+            fieldStates.newRedundantBefore = fieldUpdates.newRedundantBefore;
         if (fieldUpdates.newSafeToRead != null)
-            fieldStates.safeToRead = fieldUpdates.newSafeToRead;
+            fieldStates.newSafeToRead = fieldUpdates.newSafeToRead;
         if (fieldUpdates.newBootstrapBeganAt != null)
-            fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
+            fieldStates.newBootstrapBeganAt = fieldUpdates.newBootstrapBeganAt;
         if (fieldUpdates.newRangesForEpoch != null)
-            fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch;
+            fieldStates.newRangesForEpoch = fieldUpdates.newRangesForEpoch;
 
         if (onFlush!= null)
             onFlush.run();
diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
index 69c799b8..d9cee7af 100644
--- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java
@@ -107,7 +107,7 @@ public class LoggingJournal implements Journal
         return delegate.loadSafeToRead(commandStoreId);
     }
 
-    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
         return delegate.loadRangesForEpoch(commandStoreId);
     }
diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java 
b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
index 736a4f15..5776c51f 100644
--- a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
+++ b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java
@@ -47,8 +47,8 @@ public class VerifyingJournal implements Journal
 
     public Command loadCommand(int commandStoreId, TxnId txnId, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
-        Command sut = this.sut.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
         Command model = this.model.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
+        Command sut = this.sut.loadCommand(commandStoreId, txnId, 
redundantBefore, durableBefore);
         Invariants.checkState(sut.equals(model));
         return sut;
     }
@@ -72,26 +72,39 @@ public class VerifyingJournal implements Journal
 
     public RedundantBefore loadRedundantBefore(int commandStoreId)
     {
-        return sut.loadRedundantBefore(commandStoreId);
+        RedundantBefore model = this.model.loadRedundantBefore(commandStoreId);
+        RedundantBefore sut = this.sut.loadRedundantBefore(commandStoreId);
+        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
+        return sut;
     }
 
     public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
     {
-        return sut.loadBootstrapBeganAt(commandStoreId);
+        NavigableMap<TxnId, Ranges> model = 
this.sut.loadBootstrapBeganAt(commandStoreId);
+        NavigableMap<TxnId, Ranges> sut = 
this.sut.loadBootstrapBeganAt(commandStoreId);
+        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
+        return sut;
     }
 
     public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
     {
-        return sut.loadSafeToRead(commandStoreId);
+        NavigableMap<Timestamp, Ranges> model = 
this.model.loadSafeToRead(commandStoreId);
+        NavigableMap<Timestamp, Ranges> sut = 
this.sut.loadSafeToRead(commandStoreId);
+        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
+        return sut;
     }
 
-    public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int 
commandStoreId)
+    public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
     {
-        return sut.loadRangesForEpoch(commandStoreId);
+        CommandStores.RangesForEpoch model = 
this.sut.loadRangesForEpoch(commandStoreId);
+        CommandStores.RangesForEpoch sut = 
this.sut.loadRangesForEpoch(commandStoreId);
+        Invariants.checkState(sut.equals(model), "%s should equal %s", sut, 
model);
+        return sut;
     }
 
     public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable 
onFlush)
     {
+        model.saveStoreState(store, fieldUpdates, onFlush);
         sut.saveStoreState(store, fieldUpdates, onFlush);
     }
 }
diff --git 
a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java 
b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
index 340b6077..c0f451e1 100644
--- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
+++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java
@@ -92,7 +92,7 @@ public class ListFetchCoordinator extends 
AbstractFetchCoordinator
         @Override
         protected void readComplete(CommandStore commandStore, Data result, 
Ranges unavailable)
         {
-            Ranges slice = 
commandStore.unsafeRangesForEpoch().allAt(txnId).without(unavailable);
+            Ranges slice = 
commandStore.unsafeGetRangesForEpoch().allAt(txnId).without(unavailable);
             
((InMemoryCommandStore)commandStore).maxAppliedFor((Ranges)readScope, 
slice).begin((newMaxApplied, failure) -> {
                 if (failure != null)
                 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to