This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit c55e251dbbeffa35c85aa2d9c1605ff93ac7a340 Author: Alex Petrov <[email protected]> AuthorDate: Tue Nov 26 15:25:32 2024 +0100 Implement field saving/loading in AccordJournal Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20114 --- accord-core/src/main/java/accord/api/Journal.java | 4 +- .../java/accord/impl/AbstractSafeCommandStore.java | 126 ++++++++++++++++++++- .../java/accord/impl/InMemoryCommandStore.java | 79 +++++-------- .../src/main/java/accord/local/CommandStore.java | 11 +- .../src/main/java/accord/local/CommandStores.java | 65 +++++------ .../src/test/java/accord/impl/basic/Cluster.java | 10 +- .../accord/impl/basic/DelayedCommandStores.java | 41 +++++++ .../java/accord/impl/basic/InMemoryJournal.java | 49 ++++---- .../java/accord/impl/basic/LoggingJournal.java | 2 +- .../java/accord/impl/basic/VerifyingJournal.java | 25 +++- .../accord/impl/list/ListFetchCoordinator.java | 2 +- 11 files changed, 284 insertions(+), 130 deletions(-) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 6bd18bd2..300fc274 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -45,7 +45,7 @@ public interface Journal RedundantBefore loadRedundantBefore(int commandStoreId); NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId); NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId); - CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId); + CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId); void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush); @@ -69,7 +69,7 @@ public interface Journal public RedundantBefore newRedundantBefore; public NavigableMap<TxnId, Ranges> newBootstrapBeganAt; public NavigableMap<Timestamp, Ranges> newSafeToRead; - public CommandStores.RangesForEpoch.Snapshot newRangesForEpoch; + public CommandStores.RangesForEpoch newRangesForEpoch; public String toString() { diff --git a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java index bc053ad7..cd383980 100644 --- a/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java +++ b/accord-core/src/main/java/accord/impl/AbstractSafeCommandStore.java @@ -20,12 +20,25 @@ package accord.impl; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; import accord.api.RoutingKey; -import accord.local.*; +import accord.local.CommandStore; +import accord.local.KeyHistory; +import accord.local.PreLoadContext; +import accord.local.RedundantBefore; +import accord.local.SafeCommand; +import accord.local.SafeCommandStore; import accord.local.cfk.SafeCommandsForKey; -import accord.primitives.*; - +import accord.primitives.Ranges; +import accord.primitives.Routable; +import accord.primitives.RoutingKeys; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekables; + +import static accord.api.Journal.FieldUpdates; +import static accord.local.CommandStores.RangesForEpoch; import static accord.local.KeyHistory.TIMESTAMPS; import static accord.utils.Invariants.illegalArgument; @@ -37,9 +50,19 @@ extends SafeCommandStore { protected final PreLoadContext context; - public AbstractSafeCommandStore(PreLoadContext context) + private final CommandStore commandStore; + private FieldUpdates fieldUpdates; + + protected AbstractSafeCommandStore(PreLoadContext context, CommandStore commandStore) { this.context = context; + this.commandStore = commandStore; + } + + @Override + public CommandStore commandStore() + { + return commandStore; } public interface CommandStoreCaches<C, TFK, CFK> extends AutoCloseable @@ -186,5 +209,100 @@ extends SafeCommandStore public void postExecute() { + if (fieldUpdates == null) + return; + + if (fieldUpdates.newRedundantBefore != null) + super.unsafeSetRedundantBefore(fieldUpdates.newRedundantBefore); + + if (fieldUpdates.newBootstrapBeganAt != null) + super.setBootstrapBeganAt(fieldUpdates.newBootstrapBeganAt); + + if (fieldUpdates.newSafeToRead != null) + super.setSafeToRead(fieldUpdates.newSafeToRead); + + if (fieldUpdates.newRangesForEpoch != null) + super.setRangesForEpoch(fieldUpdates.newRangesForEpoch); + } + + /** + * Persistent field update logic + */ + + @Override + public final void upsertRedundantBefore(RedundantBefore addRedundantBefore) + { + // TODO (required): fix RedundantBefore sorting issue and switch to upsert mode + ensureFieldUpdates().newRedundantBefore = RedundantBefore.merge(redundantBefore(), addRedundantBefore); + unsafeUpsertRedundantBefore(addRedundantBefore); + } + + @Override + public final void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) + { + ensureFieldUpdates().newBootstrapBeganAt = newBootstrapBeganAt; + } + + @Override + public final void setSafeToRead(NavigableMap<Timestamp, Ranges> newSafeToRead) + { + ensureFieldUpdates().newSafeToRead = newSafeToRead; + } + + @Override + public void setRangesForEpoch(RangesForEpoch rangesForEpoch) + { + if (rangesForEpoch != null) + { + super.setRangesForEpoch(rangesForEpoch); + ensureFieldUpdates().newRangesForEpoch = rangesForEpoch; + } + } + + @Override + public RangesForEpoch ranges() + { + if (fieldUpdates != null && fieldUpdates.newRangesForEpoch != null) + return fieldUpdates.newRangesForEpoch; + + return null; + } + + @Override + public NavigableMap<TxnId, Ranges> bootstrapBeganAt() + { + if (fieldUpdates != null && fieldUpdates.newBootstrapBeganAt != null) + return fieldUpdates.newBootstrapBeganAt; + + return super.bootstrapBeganAt(); + } + + @Override + public NavigableMap<Timestamp, Ranges> safeToReadAt() + { + if (fieldUpdates != null && fieldUpdates.newSafeToRead != null) + return fieldUpdates.newSafeToRead; + + return super.safeToReadAt(); + } + + @Override + public RedundantBefore redundantBefore() + { + if (fieldUpdates != null && fieldUpdates.newRedundantBefore != null) + return fieldUpdates.newRedundantBefore; + + return super.redundantBefore(); + } + + private FieldUpdates ensureFieldUpdates() + { + if (fieldUpdates == null) fieldUpdates = new FieldUpdates(); + return fieldUpdates; + } + + public FieldUpdates fieldUpdates() + { + return fieldUpdates; } } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 9bf05302..b14767e4 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -61,7 +61,6 @@ import accord.local.KeyHistory; import accord.local.Node; import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; -import accord.local.RedundantBefore; import accord.local.RedundantStatus; import accord.local.RejectBefore; import accord.local.SafeCommand; @@ -395,7 +394,7 @@ public abstract class InMemoryCommandStore extends CommandStore if (!command.hasBeen(PreCommitted)) continue; if (!command.txnId().isVisible()) continue; - Ranges allRanges = unsafeRangesForEpoch().allBetween(txnId.epoch(), command.executeAtOrTxnId().epoch()); + Ranges allRanges = unsafeGetRangesForEpoch().allBetween(txnId.epoch(), command.executeAtOrTxnId().epoch()); boolean done = command.hasBeen(Truncated); if (!done) { @@ -699,11 +698,9 @@ public abstract class InMemoryCommandStore extends CommandStore public static class InMemorySafeStore extends AbstractSafeCommandStore<InMemorySafeCommand, InMemorySafeTimestampsForKey, InMemorySafeCommandsForKey, InMemoryCommandStoreCaches> { - private final InMemoryCommandStore commandStore; protected final Map<TxnId, InMemorySafeCommand> commands; private final Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey; private final Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey; - private RangesForEpoch ranges; public InMemorySafeStore(InMemoryCommandStore commandStore, RangesForEpoch ranges, @@ -712,12 +709,12 @@ public abstract class InMemoryCommandStore extends CommandStore Map<RoutableKey, InMemorySafeTimestampsForKey> timestampsForKey, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKey) { - super(context); - this.commandStore = commandStore; + super(context, commandStore); + + super.setRangesForEpoch(ranges); this.commands = commands; this.commandsForKey = commandsForKey; this.timestampsForKey = timestampsForKey; - this.ranges = ranges; for (InMemorySafeCommand cmd : commands.values()) { if (cmd.isUnset()) cmd.uninitialised(); @@ -738,11 +735,17 @@ public abstract class InMemoryCommandStore extends CommandStore return commands.get(txnId); } + @Override + public InMemoryCommandStore commandStore() + { + return (InMemoryCommandStore) super.commandStore(); + } + @Override protected InMemoryCommandStoreCaches tryGetCaches() { - if (commandStore.canExposeUnloaded()) - return commandStore.new InMemoryCommandStoreCaches(); + if (commandStore().canExposeUnloaded()) + return commandStore().new InMemoryCommandStoreCaches(); return null; } @@ -796,58 +799,33 @@ public abstract class InMemoryCommandStore extends CommandStore return; Ranges slice = ranges(txnId, updated.executeAtOrTxnId()); - slice = commandStore.unsafeGetRedundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice); - commandStore.rangeCommands.computeIfAbsent(txnId, ignore -> new RangeCommand(commandStore.commands.get(txnId))) + slice = commandStore().unsafeGetRedundantBefore().removeShardRedundant(txnId, updated.executeAtOrTxnId(), slice); + commandStore().rangeCommands.computeIfAbsent(txnId, ignore -> new RangeCommand(commandStore().commands.get(txnId))) .update(((AbstractRanges)updated.participants().touches()).toRanges().slice(slice, Minimal)); } - @Override - public CommandStore commandStore() - { - return commandStore; - } - @Override public DataStore dataStore() { - return commandStore.store; + return commandStore().store; } @Override public Agent agent() { - return commandStore.agent; + return commandStore().agent; } @Override public ProgressLog progressLog() { - return commandStore.progressLog; - } - - @Override - public RangesForEpoch ranges() - { - return ranges; - } - - @Override - public void setRangesForEpoch(RangesForEpoch rangesForEpoch) - { - super.setRangesForEpoch(rangesForEpoch); - ranges = rangesForEpoch; - } - - @Override - public void upsertRedundantBefore(RedundantBefore addRedundantBefore) - { - unsafeUpsertRedundantBefore(addRedundantBefore); + return commandStore().progressLog; } @Override public NodeCommandStoreService node() { - return commandStore.node; + return commandStore().node; } private static class TxnInfo @@ -878,7 +856,7 @@ public abstract class InMemoryCommandStore extends CommandStore @Override public <P1, T> T mapReduceActive(Unseekables<?> keysOrRanges, Timestamp startedBefore, Kinds testKind, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { - accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { + accumulate = commandStore().mapReduceForKey(this, keysOrRanges, (commands, prev) -> { return commands.mapReduceActive(keysOrRanges, startedBefore, testKind, map, p1, prev); }, accumulate); @@ -889,7 +867,7 @@ public abstract class InMemoryCommandStore extends CommandStore @Override public <P1, T> T mapReduceFull(Unseekables<?> keysOrRanges, TxnId testTxnId, Kinds testKind, TestStartedAt testStartedAt, TestDep testDep, TestStatus testStatus, CommandFunction<P1, T, T> map, P1 p1, T accumulate) { - accumulate = commandStore.mapReduceForKey(this, keysOrRanges, (commands, prev) -> { + accumulate = commandStore().mapReduceForKey(this, keysOrRanges, (commands, prev) -> { return commands.mapReduceFull(keysOrRanges, testTxnId, testKind, testStartedAt, testDep, testStatus, map, p1, prev); }, accumulate); @@ -900,7 +878,7 @@ public abstract class InMemoryCommandStore extends CommandStore { // TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable Map<Range, List<TxnInfo>> collect = new TreeMap<>(Range::compare); - commandStore.rangeCommands.forEach(((txnId, rangeCommand) -> { + commandStore().rangeCommands.forEach(((txnId, rangeCommand) -> { Command command = rangeCommand.command.value(); // TODO (now): probably this isn't safe - want to ensure we take dependency on any relevant syncId if (command.saveStatus().compareTo(SaveStatus.Erased) >= 0) @@ -979,7 +957,7 @@ public abstract class InMemoryCommandStore extends CommandStore if (testStatus == ANY_STATUS && testDep == ANY_DEPS) { - commandStore.historicalRangeCommands.forEach(((txnId, ranges) -> { + commandStore().historicalRangeCommands.forEach(((txnId, ranges) -> { switch (testStartedAt) { default: throw new AssertionError(); @@ -1003,7 +981,7 @@ public abstract class InMemoryCommandStore extends CommandStore List<TxnInfo> list = in.computeIfAbsent(r, ignore -> new ArrayList<>()); if (list.isEmpty() || !list.get(list.size() - 1).txnId.equals(txnId)) { - GlobalCommand global = commandStore.commands.get(txnId); + GlobalCommand global = commandStore().commands.get(txnId); if (global != null && global.value() != null) { Command command = global.value(); @@ -1035,6 +1013,7 @@ public abstract class InMemoryCommandStore extends CommandStore public void postExecute() { + super.postExecute(); commands.values().forEach(c -> { if (c == null || c.current() == null) return; @@ -1042,24 +1021,24 @@ public abstract class InMemoryCommandStore extends CommandStore Timestamp executeAt = c.current().executeAtIfKnown(); if (executeAt != null) { - if (c.current().hasBeen(Truncated)) commandStore.commandsByExecuteAt.remove(executeAt); - else commandStore.commandsByExecuteAt.put(executeAt, commandStore.command(c.txnId())); + if (c.current().hasBeen(Truncated)) commandStore().commandsByExecuteAt.remove(executeAt); + else commandStore().commandsByExecuteAt.put(executeAt, commandStore().command(c.txnId())); } if (c.isUnset() || c.current().saveStatus().isUninitialised()) - commandStore.commands.remove(c.txnId()); + commandStore().commands.remove(c.txnId()); c.invalidate(); }); timestampsForKey.values().forEach(tfk -> { if (tfk.isUnset()) - commandStore.timestampsForKey.remove(tfk.key()); + commandStore().timestampsForKey.remove(tfk.key()); tfk.invalidate(); }); commandsForKey.values().forEach(cfk -> { if (cfk.isUnset()) - commandStore.commandsForKey.remove(cfk.key()); + commandStore().commandsForKey.remove(cfk.key()); cfk.invalidate(); }); } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index f7eb397b..c38ac501 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -144,7 +144,7 @@ public abstract class CommandStore implements AgentExecutor protected final LocalListeners listeners; protected final EpochUpdateHolder epochUpdateHolder; - // Used in markShardStale to make sure the staleness includes in progresss bootstraps + // Used in markShardStale to make sure the staleness includes in progress bootstraps private transient NavigableMap<TxnId, Ranges> bootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); // additive (i.e. once inserted, rolled-over until invalidated, and the floor entry contains additions) private RedundantBefore redundantBefore = RedundantBefore.EMPTY; private MaxConflicts maxConflicts = MaxConflicts.EMPTY; @@ -242,7 +242,7 @@ public abstract class CommandStore implements AgentExecutor unsafeSetRangesForEpoch(update.newRangesForEpoch); } - public RangesForEpoch unsafeRangesForEpoch() + public RangesForEpoch unsafeGetRangesForEpoch() { return rangesForEpoch; } @@ -360,6 +360,7 @@ public abstract class CommandStore implements AgentExecutor // TODO (desired): narrow ranges to those that are owned Invariants.checkArgument(txnId.is(ExclusiveSyncPoint)); RedundantBefore newRedundantBefore = RedundantBefore.merge(redundantBefore, RedundantBefore.create(ranges, txnId, txnId, TxnId.NONE, TxnId.NONE, TxnId.NONE)); + safeStore.upsertRedundantBefore(newRedundantBefore); unsafeSetRedundantBefore(newRedundantBefore); updatedRedundantBefore(safeStore, txnId, ranges); } @@ -647,15 +648,11 @@ public abstract class CommandStore implements AgentExecutor return () -> { AsyncResult<Void> done = execute(empty(), (safeStore) -> { // Merge in a base for any ranges that needs to be covered - // TODO (review): Convoluted check to not overwrite existing bootstraps with TxnId.NONE - // If loading from disk didn't finish before this then we might initialize the range at TxnId.NONE? - // Does CommandStores.topology ensure that doesn't happen? Is it fine if it does because it will get superseded? - Ranges newBootstrapRanges = ranges; for (Ranges existing : bootstrapBeganAt.values()) newBootstrapRanges = newBootstrapRanges.without(existing); if (!newBootstrapRanges.isEmpty()) - bootstrapBeganAt = bootstrap(TxnId.NONE, newBootstrapRanges, bootstrapBeganAt); + safeStore.setBootstrapBeganAt(bootstrap(TxnId.NONE, newBootstrapRanges, bootstrapBeganAt)); safeStore.setSafeToRead(purgeAndInsert(safeToRead, TxnId.NONE, ranges)); }).beginAsResult(); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9068f19c..60a94a7c 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -142,54 +143,46 @@ public abstract class CommandStores // We ONLY remove ranges to keep logic manageable; likely to only merge CommandStores into a new CommandStore via some kind of Bootstrap public static class RangesForEpoch { - public static class Snapshot - { - public final long[] epochs; - public final Ranges[] ranges; - - public Snapshot(long[] epochs, Ranges[] ranges) - { - this.epochs = epochs; - this.ranges = ranges; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Snapshot snapshot = (Snapshot) o; - return Objects.deepEquals(epochs, snapshot.epochs) && Objects.deepEquals(ranges, snapshot.ranges); - } - - @Override - public int hashCode() - { - return Objects.hash(Arrays.hashCode(epochs), Arrays.hashCode(ranges)); - } - } - final long[] epochs; final Ranges[] ranges; - final CommandStore store; - public RangesForEpoch(long epoch, Ranges ranges, CommandStore store) + public RangesForEpoch(long epoch, Ranges ranges) { this.epochs = new long[] { epoch }; this.ranges = new Ranges[] { ranges }; - this.store = store; } - public RangesForEpoch(long[] epochs, Ranges[] ranges, CommandStore store) + public RangesForEpoch(long[] epochs, Ranges[] ranges) { + Invariants.checkState(epochs.length == ranges.length); this.epochs = epochs; this.ranges = ranges; - this.store = store; } - public Snapshot snapshot() + public int size() + { + return epochs.length; + } + + public void forEach(BiConsumer<Long, Ranges> forEach) + { + for (int i = 0; i < epochs.length; i++) + forEach.accept(epochs[i], ranges[i]); + } + + @Override + public boolean equals(Object object) + { + if (this == object) return true; + if (object == null || getClass() != object.getClass()) return false; + RangesForEpoch that = (RangesForEpoch) object; + return Objects.deepEquals(epochs, that.epochs) && Objects.deepEquals(ranges, that.ranges); + } + + @Override + public int hashCode() { - return new Snapshot(epochs, ranges); + throw new UnsupportedOperationException(); } public RangesForEpoch withRanges(long epoch, Ranges latestRanges) @@ -201,7 +194,7 @@ public abstract class CommandStores newEpochs[newLength - 1] = epoch; newRanges[newLength - 1] = latestRanges; Invariants.checkState(newEpochs[newLength - 1] == 0 || newEpochs[newLength - 1] == epoch, "Attempted to override historic epoch %d with %d", newEpochs[newLength - 1], epoch); - return new RangesForEpoch(newEpochs, newRanges, store); + return new RangesForEpoch(newEpochs, newRanges); } public @Nonnull Ranges coordinates(TxnId txnId) @@ -463,7 +456,7 @@ public abstract class CommandStores { EpochUpdateHolder updateHolder = new EpochUpdateHolder(); ShardHolder shard = new ShardHolder(supplier.create(nextId++, updateHolder)); - shard.ranges = new RangesForEpoch(epoch, addRanges, shard.store); + shard.ranges = new RangesForEpoch(epoch, addRanges); shard.store.epochUpdateHolder.add(epoch, shard.ranges, addRanges); shard.store.epochUpdateHolder.updateGlobal(newTopology.ranges()); diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 93b792e0..da3914f6 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -779,13 +779,15 @@ public class Cluster && beforeCommand.promised().equals(afterCommand.promised()) && beforeCommand.durability().equals(afterCommand.durability())); } + if (before.size() > store.unsafeCommands().size()) { - for (TxnId txnId : before.keySet()) + for (Map.Entry<TxnId, Command> entry : before.entrySet()) { + TxnId txnId = entry.getKey(); if (!store.unsafeCommands().containsKey(txnId)) { - Command beforeCommand = before.get(txnId); + Command beforeCommand = entry.getValue(); if (beforeCommand.saveStatus() == SaveStatus.Erased) continue; @@ -1123,7 +1125,7 @@ public class Cluster { return txnId + ":" + command.saveStatus() + "@" + commandStore.toString().replaceAll("DelayedCommandStore", "") - + (command.homeKey() != null && commandStore.unsafeRangesForEpoch().allAt(txnId.epoch()).contains(command.homeKey()) ? "(Home)" : ""); + + (command.homeKey() != null && commandStore.unsafeGetRangesForEpoch().allAt(txnId.epoch()).contains(command.homeKey()) ? "(Home)" : ""); } } @@ -1270,7 +1272,7 @@ public class Cluster if (participants == null) return false; - return participants.owns().intersects(commandStore.unsafeRangesForEpoch().allBetween(command.txnId().epoch(), command.executeAtIfKnownElseTxnId())); + return participants.owns().intersects(commandStore.unsafeGetRangesForEpoch().allBetween(command.txnId().epoch(), command.executeAtIfKnownElseTxnId())); } private BlockingTransaction toBlocking(Command command, DelayedCommandStore store) diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 17afe245..1d209bb9 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Queue; import java.util.concurrent.Callable; import java.util.function.BiConsumer; @@ -49,10 +50,13 @@ import accord.local.CommandStores; import accord.local.Node; import accord.local.NodeCommandStoreService; import accord.local.PreLoadContext; +import accord.local.RedundantBefore; import accord.local.SafeCommandStore; import accord.local.ShardDistributor; import accord.primitives.Range; +import accord.primitives.Ranges; import accord.primitives.RoutableKey; +import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; import accord.topology.Topology; @@ -143,6 +147,38 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread this.journal = journal; } + @Override + public void clearForTesting() + { + super.clearForTesting(); + + // Rather than cleaning up and reloading, we can just assert equality during reload + { + RedundantBefore orig = unsafeGetRedundantBefore(); + RedundantBefore loaded = journal.loadRedundantBefore(id()); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + NavigableMap<TxnId, Ranges> orig = unsafeGetBootstrapBeganAt(); + NavigableMap<TxnId, Ranges> loaded = journal.loadBootstrapBeganAt(id()); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + NavigableMap<Timestamp, Ranges> orig = unsafeGetSafeToRead(); + NavigableMap<Timestamp, Ranges> loaded = journal.loadSafeToRead(id()); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + RangesForEpoch orig = unsafeGetRangesForEpoch(); + RangesForEpoch loaded = journal.loadRangesForEpoch(id()); + + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + } + @Override public void validateRead(Command current) { @@ -262,6 +298,7 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { private final DelayedCommandStore commandStore; private final CacheLoadingChance cacheLoadingChance; + public DelayedSafeStore(DelayedCommandStore commandStore, RangesForEpoch ranges, PreLoadContext context, @@ -278,6 +315,10 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread @Override public void postExecute() { + Journal.FieldUpdates fieldUpdates = fieldUpdates(); + if (fieldUpdates != null) + commandStore.journal.saveStoreState(commandStore.id(), fieldUpdates, () -> {}); + commands.entrySet().forEach(e -> { InMemorySafeCommand safe = e.getValue(); if (!safe.isModified()) return; diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index d48cbb1b..dc676fed 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -65,22 +65,13 @@ import static accord.utils.Invariants.illegalState; public class InMemoryJournal implements Journal { private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Int2ObjectHashMap<>(); - private final FieldStates fieldStates; - - private static class FieldStates - { - RedundantBefore redundantBefore = RedundantBefore.EMPTY; - NavigableMap<TxnId, Ranges> bootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); - NavigableMap<Timestamp, Ranges> safeToRead = ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY); - CommandStores.RangesForEpoch.Snapshot rangesForEpoch = null; - } + private final Int2ObjectHashMap<FieldUpdates> fieldStates = new Int2ObjectHashMap<>(); private final Node.Id id; public InMemoryJournal(Node.Id id) { this.id = id; - this.fieldStates = new FieldStates(); } @Override @@ -120,37 +111,57 @@ public class InMemoryJournal implements Journal @Override public RedundantBefore loadRedundantBefore(int commandStoreId) { - return fieldStates.redundantBefore; + FieldUpdates fieldStates = this.fieldStates.get(commandStoreId); + if (fieldStates == null) + return null; + return fieldStates.newRedundantBefore; } @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId) { - return fieldStates.bootstrapBeganAt; + FieldUpdates fieldStates = this.fieldStates.get(commandStoreId); + if (fieldStates == null) + return null; + return fieldStates.newBootstrapBeganAt; } @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId) { - return fieldStates.safeToRead; + FieldUpdates fieldStates = this.fieldStates.get(commandStoreId); + if (fieldStates == null) + return null; + return fieldStates.newSafeToRead; } @Override - public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId) { - return fieldStates.rangesForEpoch; + FieldUpdates fieldStates = this.fieldStates.get(commandStoreId); + if (fieldStates == null) + return null; + return fieldStates.newRangesForEpoch; } public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) { + + FieldUpdates fieldStates = this.fieldStates.computeIfAbsent(store, s -> { + FieldUpdates init = new FieldUpdates(); + init.newRedundantBefore = RedundantBefore.EMPTY; + init.newBootstrapBeganAt = ImmutableSortedMap.of(TxnId.NONE, Ranges.EMPTY); + init.newSafeToRead = ImmutableSortedMap.of(Timestamp.NONE, Ranges.EMPTY); + return init; + }); if (fieldUpdates.newRedundantBefore != null) - fieldStates.redundantBefore = fieldUpdates.newRedundantBefore; + fieldStates.newRedundantBefore = fieldUpdates.newRedundantBefore; if (fieldUpdates.newSafeToRead != null) - fieldStates.safeToRead = fieldUpdates.newSafeToRead; + fieldStates.newSafeToRead = fieldUpdates.newSafeToRead; if (fieldUpdates.newBootstrapBeganAt != null) - fieldStates.bootstrapBeganAt = fieldUpdates.newBootstrapBeganAt; + fieldStates.newBootstrapBeganAt = fieldUpdates.newBootstrapBeganAt; if (fieldUpdates.newRangesForEpoch != null) - fieldStates.rangesForEpoch = fieldUpdates.newRangesForEpoch; + fieldStates.newRangesForEpoch = fieldUpdates.newRangesForEpoch; if (onFlush!= null) onFlush.run(); diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index 69c799b8..d9cee7af 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -107,7 +107,7 @@ public class LoggingJournal implements Journal return delegate.loadSafeToRead(commandStoreId); } - public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId) { return delegate.loadRangesForEpoch(commandStoreId); } diff --git a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java index 736a4f15..5776c51f 100644 --- a/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/VerifyingJournal.java @@ -47,8 +47,8 @@ public class VerifyingJournal implements Journal public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { - Command sut = this.sut.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); Command model = this.model.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); + Command sut = this.sut.loadCommand(commandStoreId, txnId, redundantBefore, durableBefore); Invariants.checkState(sut.equals(model)); return sut; } @@ -72,26 +72,39 @@ public class VerifyingJournal implements Journal public RedundantBefore loadRedundantBefore(int commandStoreId) { - return sut.loadRedundantBefore(commandStoreId); + RedundantBefore model = this.model.loadRedundantBefore(commandStoreId); + RedundantBefore sut = this.sut.loadRedundantBefore(commandStoreId); + Invariants.checkState(sut.equals(model), "%s should equal %s", sut, model); + return sut; } public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId) { - return sut.loadBootstrapBeganAt(commandStoreId); + NavigableMap<TxnId, Ranges> model = this.sut.loadBootstrapBeganAt(commandStoreId); + NavigableMap<TxnId, Ranges> sut = this.sut.loadBootstrapBeganAt(commandStoreId); + Invariants.checkState(sut.equals(model), "%s should equal %s", sut, model); + return sut; } public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId) { - return sut.loadSafeToRead(commandStoreId); + NavigableMap<Timestamp, Ranges> model = this.model.loadSafeToRead(commandStoreId); + NavigableMap<Timestamp, Ranges> sut = this.sut.loadSafeToRead(commandStoreId); + Invariants.checkState(sut.equals(model), "%s should equal %s", sut, model); + return sut; } - public CommandStores.RangesForEpoch.Snapshot loadRangesForEpoch(int commandStoreId) + public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId) { - return sut.loadRangesForEpoch(commandStoreId); + CommandStores.RangesForEpoch model = this.sut.loadRangesForEpoch(commandStoreId); + CommandStores.RangesForEpoch sut = this.sut.loadRangesForEpoch(commandStoreId); + Invariants.checkState(sut.equals(model), "%s should equal %s", sut, model); + return sut; } public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) { + model.saveStoreState(store, fieldUpdates, onFlush); sut.saveStoreState(store, fieldUpdates, onFlush); } } diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java index 340b6077..c0f451e1 100644 --- a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -92,7 +92,7 @@ public class ListFetchCoordinator extends AbstractFetchCoordinator @Override protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) { - Ranges slice = commandStore.unsafeRangesForEpoch().allAt(txnId).without(unavailable); + Ranges slice = commandStore.unsafeGetRangesForEpoch().allAt(txnId).without(unavailable); ((InMemoryCommandStore)commandStore).maxAppliedFor((Ranges)readScope, slice).begin((newMaxApplied, failure) -> { if (failure != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
