This is an automated email from the ASF dual-hosted git repository. konstantinov pushed a commit to branch fixes-260226 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 7d15d7fd629384da88f5eaf5cbd7435d6fa56ad5 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Mon Dec 22 14:08:16 2025 +0000 safe shutdown snapshot/restore listeners and progressLog --- .../src/main/java/accord/api/AsyncExecutor.java | 2 + accord-core/src/main/java/accord/api/Journal.java | 3 +- .../src/main/java/accord/api/LocalListeners.java | 17 +++ .../main/java/accord/api/ProtocolModifiers.java | 5 +- .../main/java/accord/impl/AbstractReplayer.java | 75 ++++++++++-- .../java/accord/impl/DefaultLocalListeners.java | 41 +++++++ .../java/accord/impl/InMemoryCommandStore.java | 61 ++++++---- .../java/accord/impl/progresslog/BaseTxnState.java | 35 ++++-- .../impl/progresslog/DefaultProgressLog.java | 40 ++++++- .../java/accord/impl/progresslog/HomeState.java | 9 +- .../java/accord/impl/progresslog/TxnState.java | 32 +++++ .../java/accord/impl/progresslog/WaitingState.java | 29 ++++- .../src/main/java/accord/local/Catchup.java | 15 ++- .../src/main/java/accord/local/CommandStore.java | 42 +++---- .../src/main/java/accord/local/CommandStores.java | 21 +++- .../src/main/java/accord/local/Commands.java | 4 +- .../src/main/java/accord/local/DurableBefore.java | 11 +- .../src/main/java/accord/local/MaxConflicts.java | 4 +- .../src/main/java/accord/local/MaxDecidedRX.java | 29 ++++- accord-core/src/main/java/accord/local/Node.java | 12 +- .../main/java/accord/local/RedundantBefore.java | 35 ++++-- .../main/java/accord/local/RedundantStatus.java | 4 +- .../src/main/java/accord/local/RejectBefore.java | 4 +- .../main/java/accord/local/SafeCommandStore.java | 31 +++-- .../main/java/accord/messages/SetShardDurable.java | 1 + .../java/accord/primitives/AbstractRanges.java | 2 +- .../src/main/java/accord/primitives/Timestamp.java | 2 - .../src/main/java/accord/primitives/TxnId.java | 1 + .../src/main/java/accord/topology/Topology.java | 2 +- .../main/java/accord/utils/PersistentField.java | 132 ++++++++++++++++----- .../main/java/accord/utils/ReducingRangeMap.java | 41 ++++++- .../java/accord/utils/async/AsyncCallbacks.java | 1 + .../src/test/java/accord/impl/basic/Cluster.java | 2 +- .../accord/impl/basic/DelayedCommandStores.java | 4 +- .../java/accord/impl/basic/InMemoryJournal.java | 7 +- .../java/accord/impl/basic/LoggingJournal.java | 9 +- .../src/main/java/accord/maelstrom/Cluster.java | 3 +- 37 files changed, 588 insertions(+), 180 deletions(-) diff --git a/accord-core/src/main/java/accord/api/AsyncExecutor.java b/accord-core/src/main/java/accord/api/AsyncExecutor.java index 731a2fef..f303f141 100644 --- a/accord-core/src/main/java/accord/api/AsyncExecutor.java +++ b/accord-core/src/main/java/accord/api/AsyncExecutor.java @@ -26,6 +26,7 @@ import accord.utils.async.AsyncCallbacks.RunOrFail; import accord.utils.async.AsyncChain; import accord.utils.async.Cancellable; +// TODO (required): consistent RejectedExecutionException handling public interface AsyncExecutor extends Executor { // unlike execute, throws no exceptions, nor will not wrap the runnable @@ -38,6 +39,7 @@ public interface AsyncExecutor extends Executor // Depending on this implementation this method may queue-jump, i.e. task submission order is not guaranteed. // Make sure this is semantically safe at all call-sites. + // TODO (required): RejectedExecutionException? default void executeMaybeImmediately(Runnable run) { if (!tryExecuteImmediately(run)) diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 381f1be2..d9533576 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -53,6 +53,7 @@ public interface Journal MINIMAL_WITH_DEPS } + void open(Node node); void start(Node node); Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); @@ -70,7 +71,7 @@ public interface Journal * Replays all messages from journal to rehydrate CommandStores state. Returns whether it has seen (and ignored) * any exceptions during replay. */ - boolean replay(CommandStores commandStores); + boolean replay(CommandStores commandStores, Object param); RedundantBefore loadRedundantBefore(int store); NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store); diff --git a/accord-core/src/main/java/accord/api/LocalListeners.java b/accord-core/src/main/java/accord/api/LocalListeners.java index caa51ec7..38cf4967 100644 --- a/accord-core/src/main/java/accord/api/LocalListeners.java +++ b/accord-core/src/main/java/accord/api/LocalListeners.java @@ -105,6 +105,23 @@ public interface LocalListeners this.waitingOn = waitingOn; this.awaitingStatus = awaitingStatus; } + + @Override + public boolean equals(Object that) + { + return that instanceof TxnListener && equals((TxnListener) that); + } + + public boolean equals(TxnListener that) + { + return this.waiter.equals(that.waiter) && this.waitingOn.equals(that.waitingOn) && this.awaitingStatus == that.awaitingStatus; + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } } Iterable<TxnListener> txnListeners(); diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java b/accord-core/src/main/java/accord/api/ProtocolModifiers.java index 447f4209..03f6fb15 100644 --- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java +++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java @@ -235,8 +235,7 @@ public class ProtocolModifiers public static boolean recoveryAwaitsSupersedingSyncPoints() { return recoveryAwaitsSupersedingSyncPoints; } public static void setRecoveryAwaitsSupersedingSyncPoints(boolean newRecoveryAwaitsSupersedingSyncPoints) { recoveryAwaitsSupersedingSyncPoints = newRecoveryAwaitsSupersedingSyncPoints; } - // TODO (required): default this to false once released support via recoveryAwaitsSupersedingSyncPoints - private static boolean syncPointsTrackUnstableMediumPathDependencies = true; + private static boolean syncPointsTrackUnstableMediumPathDependencies = false; public static boolean syncPointsTrackUnstableMediumPathDependencies() { return syncPointsTrackUnstableMediumPathDependencies; } public static void setSyncPointsTrackUnstableMediumPathDependencies(boolean newSyncPointsTrackUnstableMediumPathDependencies) { syncPointsTrackUnstableMediumPathDependencies = newSyncPointsTrackUnstableMediumPathDependencies; } @@ -248,8 +247,6 @@ public class ProtocolModifiers public static boolean filterDuplicateDependenciesFromAcceptReply() { return filterDuplicateDependenciesFromAcceptReply; } public static void setFilterDuplicateDependenciesFromAcceptReply(boolean newFilterDuplicateDependenciesFromAcceptReply) { filterDuplicateDependenciesFromAcceptReply = newFilterDuplicateDependenciesFromAcceptReply; } - - public enum SendStableMessages { TO_ALL, FOR_READS, FOR_READS_OR_NONE_IF_FASTEXEC} private static SendStableMessages sendStableMessages = FOR_READS_OR_NONE_IF_FASTEXEC; public static void setSendStableMessages(SendStableMessages newSendStableMessages) { sendStableMessages = newSendStableMessages; } diff --git a/accord-core/src/main/java/accord/impl/AbstractReplayer.java b/accord-core/src/main/java/accord/impl/AbstractReplayer.java index 8c5c0f90..5e97681e 100644 --- a/accord-core/src/main/java/accord/impl/AbstractReplayer.java +++ b/accord-core/src/main/java/accord/impl/AbstractReplayer.java @@ -34,7 +34,12 @@ import accord.primitives.Participants; import accord.primitives.SaveStatus; import accord.primitives.TxnId; import accord.utils.Invariants; +import accord.utils.UnhandledEnum; +import static accord.impl.AbstractReplayer.Replay.NONE; +import static accord.impl.AbstractReplayer.Replay.TO_BOTH; +import static accord.impl.AbstractReplayer.Replay.TO_COMMAND_STORE; +import static accord.impl.AbstractReplayer.Replay.TO_DATA_STORE; import static accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_COMMAND_STORE; import static accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE; import static accord.primitives.SaveStatus.Applying; @@ -45,14 +50,41 @@ import static accord.primitives.Txn.Kind.Write; public abstract class AbstractReplayer implements Journal.Replayer { + // TODO (required): NON_DURABLE does not properly account for things like pre-bootstrap + public enum Mode { ALL, PART_NON_DURABLE, NON_DURABLE } + public enum Replay + { + // warning: behaviour depends on bit pattern of ordinals, so change with care + NONE, TO_COMMAND_STORE, TO_DATA_STORE, TO_BOTH; + + private static final Replay[] lookup = values(); + + public boolean includes(Replay replay) + { + return (ordinal() & replay.ordinal()) == replay.ordinal(); + } + public Replay atLeast(Replay that) + { + return lookup[ordinal() | that.ordinal()]; + } + public Replay atMost(Replay that) + { + return lookup[ordinal() & that.ordinal()]; + } + } + public final RedundantBefore redundantBefore; + public final Mode mode; public final TxnId minReplay; - protected AbstractReplayer(CommandStore commandStore, @Nullable TxnId minReplay) + protected AbstractReplayer(CommandStore commandStore, Mode mode, @Nullable TxnId minReplay) { this.redundantBefore = commandStore.unsafeGetRedundantBefore(); + this.mode = mode; Invariants.require(redundantBefore.ranges(Objects::nonNull).containsAll(commandStore.unsafeGetRangesForEpoch().all())); - this.minReplay = TxnId.noneIfNull(redundantBefore.foldl((b, v) -> TxnId.nonNullOrMin(v, b.maxBoundBoth(LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE)), minReplay, ignore -> false)); + if (mode != Mode.ALL) + minReplay = redundantBefore.foldl((b, v) -> TxnId.nonNullOrMin(v, replayBound(b)), minReplay, ignore -> false); + this.minReplay = TxnId.noneIfNull(minReplay); } protected boolean maybeShouldReplay(TxnId txnId) @@ -60,25 +92,47 @@ public abstract class AbstractReplayer implements Journal.Replayer return txnId.compareTo(minReplay) >= 0; } - protected boolean shouldReplay(TxnId txnId, StoreParticipants participants) + protected Replay shouldReplay(TxnId txnId, StoreParticipants participants) { Participants<?> search = participants.route(); if (search == null) search = participants.hasTouched(); - return redundantBefore.foldl(search, (b, v, id) -> v || b.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_DATA_STORE).compareTo(id) <= 0, false, txnId, i -> i); + switch (mode) + { + default: throw new UnhandledEnum(mode); + case ALL: return TO_BOTH; + case NON_DURABLE: return redundantBefore.foldl(search, (b, v, id) -> v.atMost(replay(b, id)), TO_BOTH, txnId, i -> false); + case PART_NON_DURABLE: return redundantBefore.foldl(search, (b, v, id) -> v.atLeast(replay(b, id)), NONE, txnId, i -> false); + } + } + + private static TxnId replayBound(RedundantBefore.Bounds bounds) + { + return bounds.maxBoundBoth(LOCALLY_DURABLE_TO_COMMAND_STORE, LOCALLY_DURABLE_TO_DATA_STORE); } - protected void initialiseState(SafeCommandStore safeStore, TxnId txnId) + private static Replay replay(RedundantBefore.Bounds bounds, TxnId txnId) + { + Replay replay = NONE; + if (bounds.maxBound(LOCALLY_DURABLE_TO_COMMAND_STORE).compareTo(txnId) <= 0) + replay = TO_COMMAND_STORE; + if (bounds.maxBound(LOCALLY_DURABLE_TO_DATA_STORE).compareTo(txnId) <= 0) + replay = replay.atLeast(TO_DATA_STORE); + return replay; + } + + protected void replay(SafeCommandStore safeStore, TxnId txnId, Replay replay) { SafeCommand safeCommand = safeStore.unsafeGet(txnId); { Command command = safeCommand.current(); if (command.saveStatus().compareTo(SaveStatus.Stable) >= 0 && command.saveStatus().compareTo(PreApplied) <= 0) { - Commands.maybeExecute(safeStore, safeCommand, command, false, true); + if (replay.includes(TO_COMMAND_STORE)) + Commands.maybeExecute(safeStore, safeCommand, command, false, true); } else if (command.saveStatus().compareTo(Applying) >= 0 && command.saveStatus().compareTo(TruncatedApplyWithOutcome) <= 0) { - if (command.txnId().is(Write)) + if (command.txnId().is(Write) && replay.includes(TO_DATA_STORE)) { Commands.applyChain(safeStore, command) .begin(safeStore.agent()); @@ -86,7 +140,10 @@ public abstract class AbstractReplayer implements Journal.Replayer else Invariants.expect(command.hasBeen(Applied), "%s is Applying but is not a Write transaction", txnId); } } - safeCommand.update(safeStore, safeCommand.current(), true); - safeStore.notifyListeners(safeCommand, null); + if (replay.includes(Replay.TO_COMMAND_STORE)) + { + safeCommand.update(safeStore, safeCommand.current(), true); + safeStore.notifyListeners(safeCommand, null); + } } } diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java index 0e6fa3bf..d83a5502 100644 --- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java @@ -18,10 +18,12 @@ package accord.impl; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumMap; import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiFunction; @@ -44,6 +46,8 @@ import accord.utils.AsymmetricComparator; import accord.utils.Invariants; import accord.utils.btree.BTree; import accord.utils.btree.BTreeRemoval; +import accord.utils.btree.BulkIterator; +import accord.utils.btree.UpdateFunction; import static accord.utils.ArrayBuffers.cachedAny; import static accord.utils.ArrayBuffers.cachedTxnIds; @@ -737,4 +741,41 @@ public class DefaultLocalListeners implements LocalListeners }; }; } + + public void restore(List<TxnListener> listeners) + { + if (listeners.isEmpty()) + return; + + if (!BTree.isEmpty(txnListeners)) + throw new IllegalStateException("Restore only supported if uninitialised"); + + listeners.sort((a, b) -> { + int c = a.waitingOn.compareTo(b.waitingOn); + if (c == 0) c = a.awaitingStatus.compareTo(b.awaitingStatus); + if (c == 0) c = a.waiter.compareTo(b.waiter); + return c; + }); + + List<TxnListeners> build = new ArrayList<>(); + int li = 0; + while (li < listeners.size()) + { + TxnListener l = listeners.get(li); + TxnListeners ls = new TxnListeners(l.waitingOn, l.awaitingStatus); + build.add(ls); + ls.add(l.waiter); + while (++li < listeners.size() && (l = listeners.get(li)).waitingOn.equals(ls) && l.awaitingStatus == ls.await) + ls.add(l.waiter); + } + txnListeners = BTree.build(BulkIterator.of(build.iterator()), build.size(), UpdateFunction.noOp()); + } + + public List<TxnListener> snapshot() + { + List<TxnListener> snapshot = new ArrayList<>(BTree.size(txnListeners)); + for (TxnListener listener : txnListeners()) + snapshot.add(listener); + return snapshot; + } } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 8c80cb85..f0eb6611 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -41,6 +41,7 @@ import java.util.function.Predicate; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +50,14 @@ import accord.api.Agent; import accord.api.DataStore; import accord.api.Journal; import accord.api.LocalListeners; +import accord.api.LocalListeners.TxnListener; import accord.api.ProgressLog; import accord.api.RoutingKey; +import accord.impl.cfr.IdEntry; import accord.impl.cfr.InMemoryRangeSummaryIndex; import accord.impl.cfr.LoadListener; import accord.impl.progresslog.DefaultProgressLog; +import accord.impl.progresslog.TxnState; import accord.local.Cleanup; import accord.local.Command; import accord.local.CommandStore; @@ -74,18 +78,8 @@ import accord.local.StoreParticipants; import accord.local.cfk.CommandsForKey; import accord.local.cfk.SafeCommandsForKey; import accord.local.cfk.Serialize; -import accord.primitives.AbstractUnseekableKeys; -import accord.primitives.PartialDeps; -import accord.primitives.Participants; -import accord.primitives.Ranges; -import accord.primitives.RoutableKey; -import accord.primitives.Route; -import accord.primitives.Status; import accord.primitives.Status.Durability.HasOutcome; -import accord.primitives.Timestamp; import accord.primitives.Txn.Kind.Kinds; -import accord.primitives.TxnId; -import accord.primitives.Unseekables; import accord.utils.Invariants; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; @@ -106,6 +100,7 @@ import static accord.primitives.Routable.Domain.Key; import static accord.primitives.Routable.Domain.Range; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.SaveStatus.Applying; +import static accord.primitives.SaveStatus.Invalidated; import static accord.primitives.SaveStatus.ReadyToExecute; import static accord.primitives.Status.Applied; import static accord.primitives.Status.Committed; @@ -127,6 +122,9 @@ public abstract class InMemoryCommandStore extends CommandStore private final long id = nextId.incrementAndGet(); private final NavigableMap<RoutingKey, ByteBuffer> commandsForKey = new TreeMap<>(); + private List<IdEntry> commandsForRanges; + private List<TxnListener> listeners; + private List<TxnState> progressLog; private int waitingForCfk; private Snapshot(){} @@ -136,6 +134,10 @@ public abstract class InMemoryCommandStore extends CommandStore for (Map.Entry<RoutingKey, ByteBuffer> e : commandsForKey.entrySet()) commandStore.commandsForKey.computeIfAbsent(e.getKey(), GlobalCommandsForKey::new).value(Serialize.fromBytes(e.getKey(), e.getValue())); + commandStore.progressLog.clear(); + ((DefaultProgressLog)commandStore.progressLog).restore(null, progressLog); + ((DefaultLocalListeners)commandStore.listeners).restore(listeners); + commandStore.commandsForRanges.restore(commandsForRanges); commandStore.commandsForRanges.prune(commandStore); } @@ -162,6 +164,9 @@ public abstract class InMemoryCommandStore extends CommandStore { Snapshot snapshot = new Snapshot(); + snapshot.listeners = ((DefaultLocalListeners)commandStore.listeners).snapshot(); + snapshot.progressLog = ((DefaultProgressLog)commandStore.progressLog).snapshot(); + snapshot.commandsForRanges = commandStore.commandsForRanges.snapshot(); for (Map.Entry<RoutableKey, GlobalCommandsForKey> e : commandStore.commandsForKey.entrySet()) { GlobalCommandsForKey global = e.getValue(); @@ -366,7 +371,7 @@ public abstract class InMemoryCommandStore extends CommandStore } @Override - protected void updatedRedundantBefore(SafeCommandStore safeStore, RedundantBefore added) + protected void upsertedRedundantBefore(SafeCommandStore safeStore, RedundantBefore added) { InMemorySafeStore inMemorySafeStore = (InMemorySafeStore) safeStore; for (int i = 0 ; i < added.size() ; ++i) @@ -390,7 +395,8 @@ public abstract class InMemoryCommandStore extends CommandStore for (TxnId txnId : clearing) { GlobalCommand globalCommand = commands.get(txnId); - Invariants.require(globalCommand != null && !globalCommand.isEmpty()); + if (globalCommand == null) + continue; // now we restore contents from snapshot, we might repopulate older items Command command = globalCommand.value(); StoreParticipants participants = command.participants().filter(LOAD, safeStore, txnId, command.executeAtIfKnown()); Cleanup cleanup = Cleanup.shouldCleanup(FULL, txnId, command.executeAtIfKnown(), command.saveStatus(), command.durability(), participants, unsafeGetRedundantBefore(), durableBefore()); @@ -401,18 +407,24 @@ public abstract class InMemoryCommandStore extends CommandStore || !Route.isFullRoute(command.route())))); } } - super.updatedRedundantBefore(safeStore, added); + super.upsertedRedundantBefore(safeStore, added); } @Override public void markShardDurable(SafeCommandStore safeStore, TxnId syncId, Ranges ranges, HasOutcome level) { super.markShardDurable(safeStore, syncId, ranges, level); - // TODO (required): this should happen on markLocallyApplied if (level == Universal) commandsForRanges.prune(syncId, ranges, safeStore.redundantBefore()); } + @Override + protected void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId syncId, Ranges ranges, SaveStatus prevStatus) + { + super.markExclusiveSyncPointLocallyApplied(safeStore, syncId, ranges, prevStatus); + commandsForRanges.prune(syncId, ranges, safeStore.redundantBefore()); + } + protected InMemorySafeStore createSafeStore(PreLoadContext context, CommandsForRangeLoad cfrLoad, Map<TxnId, InMemorySafeCommand> commands, Map<RoutableKey, InMemorySafeCommandsForKey> commandsForKeys) @@ -925,8 +937,9 @@ public abstract class InMemoryCommandStore extends CommandStore Invariants.require(loader.loadKeysFor() == RECOVERY); Command command = commands.get(txnId).value(); Summary summary = loader.ifRelevant(command); - Invariants.require(summary != null); - loaded.put(summary.plainTxnId(), summary); + // TODO (expected): prune implied invalidations from index, so no need to special case + if (summary == null) Invariants.require(command.saveStatus() == Invalidated); + else loaded.put(summary.plainTxnId(), summary); }); return new CommandsForRangeLoad(loader, loaded, commandsForRanges.registerListener(new LoadListener(loader, loaded))); } @@ -1217,7 +1230,7 @@ public abstract class InMemoryCommandStore extends CommandStore commandsForKey.clear(); commandsForRanges.clear(); progressLog.clear(); - unsafeSetRejectBefore(new RejectBefore()); + unsafeSetRejectBefore(RejectBefore.EMPTY); hasResumedBootstraps = false; } @@ -1233,17 +1246,17 @@ public abstract class InMemoryCommandStore extends CommandStore private CommandReplayer(InMemoryCommandStore commandStore) { // TODO (required): we shouldn't be providing TxnId.NONE here, we need to standardise on querying journal for data missing from InMemoryCommandStore - super(commandStore, TxnId.NONE); + super(commandStore, Mode.PART_NON_DURABLE, TxnId.NONE); this.commandStore = commandStore; } - private AsyncChain<Void> apply(Command command) + private AsyncChain<Void> apply(Command command, Replay replay) { return AsyncChains.success(commandStore.executeInContext(commandStore, PreLoadContext.contextFor(command.txnId(), "Replay"), null, (SafeCommandStore safeStore) -> { - initialiseState(safeStore, command.txnId()); + super.replay(safeStore, command.txnId(), replay); return null; })); } @@ -1276,10 +1289,14 @@ public abstract class InMemoryCommandStore extends CommandStore } } - if (command == null || !maybeShouldReplay(txnId) || !shouldReplay(txnId, command.participants())) + if (command == null || !maybeShouldReplay(txnId)) + return AsyncChains.success(null); + + Replay replay = shouldReplay(txnId, command.participants()); + if (replay == Replay.NONE) return AsyncChains.success(null); - return apply(command); + return apply(command, replay); } } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java b/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java index 57d6d803..e98d6f86 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/BaseTxnState.java @@ -46,7 +46,10 @@ import static accord.impl.progresslog.TxnStateKind.Waiting; */ abstract class BaseTxnState extends LogGroupTimers.Timer implements Comparable<BaseTxnState> { - private static final int CONTACT_ALL_SHIFT = 63; // TODO (desired): have separate contact all flags for recovery and waiting states + private static final int RESTORED_SHIFT = 63; + static final long RESTORED_BIT = 1L << RESTORED_SHIFT; + private static final int CONTACT_ALL_SHIFT = RESTORED_SHIFT - 1; + private static final long CONTACT_ALL_BIT = 1L << CONTACT_ALL_SHIFT; // TODO (desired): have separate contact all flags for recovery and waiting states private static final int SCHEDULED_TIMER_SHIFT = CONTACT_ALL_SHIFT - 1; private static final int INVALID_IF_UNCOMMITTED_SHIFT = SCHEDULED_TIMER_SHIFT - 1; private static final int PENDING_TIMER_BITS = 9; @@ -59,22 +62,25 @@ abstract class BaseTxnState extends LogGroupTimers.Timer implements Comparable<B public final TxnId txnId; /** - * bits [0..43) encode WaitingState + * bits [0..8) encode HomeState * 2 bits for Progress - * 3 bits for BlockedUntil target - * 3 bits for BlockedUntil that home shard can satisfy - * 32 bits for remote progress key counter [note: if we need to in future we can safely and easily reclaim bits here] + * 3 bits for CoordinatePhase * 3 bits for retry counter * <p> - * bits [43..51) encode HomeState + * bits [8..51) encode WaitingState * 2 bits for Progress - * 3 bits for CoordinatePhase + * 2 bits for BlockedUntil target + * 2 bits for BlockedUntil querying (<= target) + * 2 bits for BlockedUntil that home shard can satisfy + * 1 bit to query non-home shards (as home shard has been truncated) + * 24+5 bits for remote key progress tracking [note: if we need to in future we can safely and easily reclaim bits here] * 3 bits for retry counter * <p> - * bits [52..61) for pending timer delay - * bit 61 indicates if this transaction can be inferred invalid if a later quorum finds it not committed on any shard - * bit 62 for which kind of timer is scheduled - * bit 63 for whether we should contact all candidate replicas (rather than just our preferred group) + * bits [51..60) for pending timer delay + * bit 60 indicates if this transaction can be inferred invalid if a later quorum finds it not committed on any shard + * bit 61 for which kind of timer is scheduled + * bit 62 for whether we should contact all candidate replicas (rather than just our preferred group) + * bit 63 for whether the item was restored from a snapshot (so can modify certain invariants) */ long encodedState; @@ -89,9 +95,14 @@ abstract class BaseTxnState extends LogGroupTimers.Timer implements Comparable<B return this.txnId.compareTo(that.txnId); } + boolean isRestored() + { + return (encodedState & RESTORED_BIT) != 0L; + } + boolean contactEveryone() { - return ((encodedState >>> CONTACT_ALL_SHIFT) & 1L) == 1L; + return (encodedState & CONTACT_ALL_BIT) != 0L; } void setContactEveryone(boolean newContactEveryone) diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index 33aa3021..fa0f9466 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -21,6 +21,7 @@ package accord.impl.progresslog; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -55,6 +56,8 @@ import accord.utils.LogGroupTimers; import accord.utils.TinyEnumSet; import accord.utils.btree.BTree; import accord.utils.btree.BTreeRemoval; +import accord.utils.btree.BulkIterator; +import accord.utils.btree.UpdateFunction; import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.Object2ObjectHashMap; @@ -462,7 +465,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor if (command == null) command = uninitialised(blockedBy.txnId()); SaveStatus saveStatus = command.saveStatus(); - Invariants.require(saveStatus.compareTo(blockedUntil.unblockedFrom) < 0); + Invariants.expect(saveStatus.compareTo(blockedUntil.unblockedFrom) < 0); StoreParticipants blockedOnStoreParticipants2 = null; if (blockedOnParticipants != null || blockedOnRoute != null) @@ -883,7 +886,6 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor } } - @VisibleForImplementation public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId txnId) { clearPendingAndActive(kind, txnId); @@ -1072,7 +1074,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor public boolean isWaitingUninitialised() { - return current.isUninitialised(); + return current.isWaitingUninitialised(); } @Nonnull @@ -1122,4 +1124,36 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor return current.homeRunCounter(); } } + + public void restore(SafeCommandStore safeStore, List<TxnState> states) + { + if (!BTree.isEmpty(stateMap)) + throw new IllegalStateException("Restore only supported if uninitialised"); + + { + List<TxnState> snapshot = new ArrayList<>(states.size()); + for (TxnState state : states) + snapshot.add(state.snapshot()); + states = snapshot; + } + + states.sort(Comparator.naturalOrder()); + stateMap = BTree.build(BulkIterator.of(states.iterator()), states.size(), UpdateFunction.noOp()); + + for (TxnState state : states) + { + if (!state.isHomeDoneOrUninitialised() && state.homeProgress() != NoneExpected) + state.updateScheduling(safeStore, this, Home, null, Queued); + if (!state.isWaitingDoneOrUninitialised() && state.waitingProgress() != NoneExpected) + state.updateScheduling(safeStore, this, Waiting, null, Queued); + } + } + + public List<TxnState> snapshot() + { + List<TxnState> snapshot = new ArrayList<>(BTree.size(stateMap)); + for (TxnState state : BTree.<TxnState>iterable(stateMap)) + snapshot.add(state.snapshot()); + return snapshot; + } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 54dee971..8262dce5 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -67,6 +67,7 @@ abstract class HomeState extends BaseTxnState private static final long SET_MASK = ~((PROGRESS_MASK << PROGRESS_SHIFT) | (STATUS_MASK << STATUS_SHIFT)); static final int HOME_STATE_END_SHIFT = RUN_COUNTER_SHIFT + 3; + static final long SNAPSHOT_HOME_MASK = ~SET_MASK; static { @@ -80,13 +81,13 @@ abstract class HomeState extends BaseTxnState void set(SafeCommandStore safeStore, DefaultProgressLog owner, HomePhase newHomePhase, Progress newProgress) { - setNoScheduling(newHomePhase, newProgress); + setWithoutScheduling(newHomePhase, newProgress); if (newProgress == NoneExpected) owner.clearProgressToken(txnId); updateScheduling(safeStore, owner, Home, null, newProgress); } - void setNoScheduling(HomePhase newHomePhase, Progress newProgress) + void setWithoutScheduling(HomePhase newHomePhase, Progress newProgress) { encodedState &= SET_MASK; encodedState |= ((long) newHomePhase.ordinal() << STATUS_SHIFT) @@ -156,7 +157,7 @@ abstract class HomeState extends BaseTxnState HomePhase shouldUpdatePhase(DefaultProgressLog owner, Command command) { - if (command.saveStatus() == SaveStatus.Erased) + if (command.durability().isDurableOrInvalidated() || command.saveStatus() == SaveStatus.Erased) return Done; HomePhase phase = homePhase(); @@ -197,7 +198,7 @@ abstract class HomeState extends BaseTxnState set(safeStore, owner, updatePhase, NoneExpected); return; } - setNoScheduling(updatePhase, Queued); + setWithoutScheduling(updatePhase, Queued); } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java index 1292061e..800b53d3 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java @@ -34,6 +34,16 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; public final class TxnState extends WaitingState implements PreLoadContext { + public static class SerializationSupport + { + public static TxnState create(TxnId txnId, long encoded) + { + TxnState result = new TxnState(txnId); + result.encodedState = encoded; + return result; + } + } + TxnState(TxnId txnId) { super(txnId); @@ -185,4 +195,26 @@ public final class TxnState extends WaitingState implements PreLoadContext { return "Progress"; } + + public TxnState snapshot() + { + TxnState copy = new TxnState(txnId); + copy.encodedState = (encodedState & (SNAPSHOT_HOME_MASK | SNAPSHOT_WAITING_MASK)) | RESTORED_BIT; + return copy; + } + + public long encodedState() + { + return encodedState; + } + + public boolean equals(Object that) + { + return that instanceof TxnState && equals((TxnState) that); + } + + public boolean equals(TxnState that) + { + return this.txnId.equals(that.txnId) && this.encodedState == that.encodedState; + } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java index d6c9065c..8dfc765c 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java @@ -104,7 +104,7 @@ abstract class WaitingState extends HomeState private static final int AWAIT_STARTED_SHIFT = QUERY_SHARDS_NOT_HOME_SHIFT + 1; private static final int AWAIT_STARTED_BIT = 1 << AWAIT_STARTED_SHIFT; private static final int AWAIT_SHIFT = AWAIT_STARTED_SHIFT + 1; - private static final int AWAIT_BITS = 26; + private static final int AWAIT_BITS = 24; private static final long AWAIT_MASK = (1L << AWAIT_BITS) - 1; private static final int AWAIT_EPOCH_SHIFT = AWAIT_SHIFT + AWAIT_BITS; private static final int AWAIT_EPOCH_BITS = 4; @@ -114,6 +114,8 @@ abstract class WaitingState extends HomeState private static final int RETRY_COUNTER_SHIFT = AWAIT_EPOCH_SHIFT + AWAIT_EPOCH_BITS; private static final long RETRY_COUNTER_MASK = 0x7; static final int WAITING_STATE_END_SHIFT = RETRY_COUNTER_SHIFT + 3; + static long SNAPSHOT_WAITING_MASK = INITIALISED_MASK | ~SET_MASK | QUERY_SHARDS_NOT_HOME_BIT; + static { Invariants.require(BLOCKED_UNTIL_SHIFT == PROGRESS_SHIFT + Long.bitCount(PROGRESS_MASK)); @@ -153,7 +155,7 @@ abstract class WaitingState extends HomeState encodedState |= (long) homeStatus.ordinal() << HOME_SATISFIES_SHIFT; } - boolean isUninitialised() + boolean isWaitingUninitialised() { return 0 == (encodedState & INITIALISED_MASK); } @@ -450,13 +452,16 @@ abstract class WaitingState extends HomeState incrementWaitingRunCounter(); BlockedUntil blockedUntil = blockedUntil(); Command command = safeCommand.current(); - if (command.saveStatus().compareTo(SaveStatus.Erased) >= 0 // TODO (expected): improve progress log clearing to guarantee we don't encounter this status - || !Invariants.expect(command.saveStatus().compareTo(blockedUntil.unblockedFrom) < 0, - "Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command)) + if (command.saveStatus().compareTo(blockedUntil.unblockedFrom) >= 0) { + // TODO (expected): improve progress log clearing to guarantee we don't encounter Erased + Invariants.expect(command.saveStatus() == SaveStatus.Erased || isRestored(), + "Command has met desired criteria (%s) but progress log entry has not been cancelled: %s", + blockedUntil.unblockedFrom, command); setWaitingDone(owner); return; } + TxnId txnId = safeCommand.txnId(); // first make sure we have enough information to obtain the command locally Timestamp executeAt = command.executeAtIfKnown(); @@ -832,6 +837,13 @@ abstract class WaitingState extends HomeState return; } + if (safeCommand != null && safeCommand.current().saveStatus().compareTo(querying.unblockedFrom) >= 0) + { + if (tracing != null) + tracing.trace(owner.commandStore, "Received async callback %d with %s; local command already exceeds wait status"); + return; + } + callbackId >>= 1; Invariants.nonNull(safeCommand); Route<?> route = Route.castToRoute(safeCommand.current().maxParticipants()); @@ -924,7 +936,7 @@ abstract class WaitingState extends HomeState void awaitSlice(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, Route<?> route, int callbackId, @Nullable Tracing tracing) { - Invariants.require(blockedUntil.waitsOn == SHARD); + Invariants.require(blockedUntil.waitsOn == SHARD || queryShardsNotHome()); // TODO (expected): special-case when this shard is home key to avoid remote messages await(owner, blockedUntil, txnId, executeAt, route, callbackId, WaitingState::synchronousAwaitSliceCallback, tracing); } @@ -982,6 +994,11 @@ abstract class WaitingState extends HomeState return waitingProgress() == NoneExpected && blockedUntil() == CanApply; } + boolean isWaitingDoneOrUninitialised() + { + return isWaitingDone() || isWaitingUninitialised(); + } + enum CallbackKind { Fetch, FetchRoute, AwaitHome, AwaitSlice diff --git a/accord-core/src/main/java/accord/local/Catchup.java b/accord-core/src/main/java/accord/local/Catchup.java index ea2b7611..658321d7 100644 --- a/accord-core/src/main/java/accord/local/Catchup.java +++ b/accord-core/src/main/java/accord/local/Catchup.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +32,7 @@ import accord.coordinate.FetchDurableBefore; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.SaveStatus; +import accord.primitives.Status; import accord.primitives.TxnId; import accord.utils.Reduce; import accord.utils.async.AsyncChain; @@ -58,7 +60,8 @@ public class Catchup boolean register(SafeCommandStore safeStore) { - waitingOn = safeStore.ranges().all().slice(durableBefore.ranges(Objects::nonNull), Minimal); + waitingOn = safeStore.ranges().all().slice(durableBefore.ranges(Objects::nonNull), Minimal).mergeTouching(); + logger.debug("{}: Registering listener on {}, filtering by {}", safeStore.commandStore(), waitingOn, safeStore.redundantBefore().map(b -> b == null ? null : b.maxBound(LOCALLY_APPLIED), TxnId[]::new)); updateWaitingOn(safeStore); if (!waitingOn.isEmpty()) @@ -94,8 +97,14 @@ public class Catchup //noinspection DataFlowIssue safeStore = safeStore; PreLoadContext ctx = PreLoadContext.contextFor(txnId, "Catchup"); - if (safeStore.canExecuteWith(ctx)) safeStore.progressLog().waiting(CanApply, safeStore, safeStore.get(txnId), null, Ranges.of(range), null); - else safeStore.commandStore().execute(ctx, safeStore0 -> safeStore0.progressLog().waiting(CanApply, safeStore0, safeStore0.get(txnId), null, Ranges.of(range), null)); + if (safeStore.canExecuteWith(ctx)) markWaiting(safeStore, safeStore.get(txnId), range); + else safeStore.commandStore().execute(ctx, (Consumer<? super SafeCommandStore>) safeStore0 -> markWaiting(safeStore0, safeStore0.get(txnId), range), safeStore.agent()); + } + + private static void markWaiting(SafeCommandStore safeStore, SafeCommand safeCommand, Range range) + { + if (!safeCommand.current().hasBeen(Status.PreApplied)) + safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, Ranges.of(range), null); } private void done(SafeCommandStore safeStore) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 532f1ec8..ac9e7d98 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -35,6 +35,7 @@ import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.Nullable; +import accord.primitives.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSortedMap; import org.slf4j.Logger; @@ -55,14 +56,7 @@ import accord.local.Commands.NotifyWaitingOnPlus; import accord.local.PreLoadContext.Empty; import accord.local.RedundantBefore.Bounds; import accord.local.RedundantStatus.SomeStatus; -import accord.primitives.Ranges; -import accord.primitives.Routables; -import accord.primitives.SaveStatus; -import accord.primitives.Status; import accord.primitives.Status.Durability.HasOutcome; -import accord.primitives.Timestamp; -import accord.primitives.TxnId; -import accord.primitives.Unseekables; import accord.utils.DeterministicIdentitySet; import accord.utils.Invariants; import accord.utils.Reduce; @@ -190,7 +184,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA */ private NavigableMap<Timestamp, Ranges> safeToRead = emptySafeToRead(); private final Set<Bootstrap> bootstraps = Collections.synchronizedSet(new DeterministicIdentitySet<>()); - @Nullable private RejectBefore rejectBefore; + private RejectBefore rejectBefore = RejectBefore.EMPTY; static class WaitingOnVisibility { @@ -228,8 +222,6 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA return id; } - public void restore() {}; - public abstract Journal.Replayer replayer(); // expected to invoke safeStore.upsertRedundantBefore at some future point, when the commandStore state is durably persisted protected abstract void ensureDurable(Ranges ranges, RedundantBefore onCommandStoreDurable); @@ -441,7 +433,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA /** * To be overridden by implementations, to ensure the new state is persisted. */ - protected void setMaxConflicts(MaxConflicts maxConflicts) + protected void unsafeSetMaxConflicts(MaxConflicts maxConflicts) { this.maxConflicts = maxConflicts; } @@ -512,15 +504,14 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA maxConflictsUpdates = 0; } - setMaxConflicts(updatedMaxConflicts); + unsafeSetMaxConflicts(updatedMaxConflicts); } - final void markExclusiveSyncPoint(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) + final void upsertRejectBefore(SafeCommandStore safeStore, TxnId txnId, Ranges ranges) { // TODO (desired): narrow ranges to those that are owned Invariants.requireArgument(txnId.isSyncPoint()); - RejectBefore newRejectBefore = rejectBefore != null ? rejectBefore : new RejectBefore(); - newRejectBefore = RejectBefore.add(newRejectBefore, ranges, txnId); + RejectBefore newRejectBefore = RejectBefore.add(rejectBefore, ranges, txnId); unsafeSetRejectBefore(newRejectBefore); } @@ -529,7 +520,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA unsafeSetMaxDecidedRX(maxDecidedRX.update(ranges, txnId)); } - final void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges, SaveStatus prevStatus) + protected void markExclusiveSyncPointLocallyApplied(SafeCommandStore safeStore, TxnId txnId, Ranges ranges, SaveStatus prevStatus) { // TODO (desired): narrow ranges to those that are owned if (prevStatus.compareTo(SaveStatus.Applied) < 0) @@ -748,7 +739,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA // If rebootstrap can grab a later timestamp for subsequent attempts, but this timestamp is enough for us // to establish which transactions, for which ranges the node can safely participate in). TxnId unreadyBefore = bootstrap.start(safeStore); - safeStore.unsafeUpsertRedundantBefore(RedundantBefore.create(ranges, unreadyBefore, LOG_UNAVAILABLE_ONLY)); + safeStore.upsertRedundantBefore(RedundantBefore.create(ranges, unreadyBefore, LOG_UNAVAILABLE_ONLY)); updateMaxConflicts(ranges, unreadyBefore); // TODO (desired): we could start accepting non-dep requests here bootstrap.data.invoke((SettableByCallback<Void>)ready.data); @@ -756,7 +747,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA ready.coordinate.invokeIfSuccess(() -> { execute((Empty)() -> "Accept Dependency Requests", safeStore0 -> { unsafeAcceptRequests(remaining); - }); + }, agent); }); return null; })).begin(agent); @@ -811,7 +802,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA logger.info("Failed to close epoch {} for ranges {} on store {}, but some are retired; marking these as synced.", epoch, ranges, id, fail); execute((Empty)() -> "Mark Retired Ranges Synced", safeStore -> { markVisibleInternal(safeStore, epoch, retired, "(Retired)"); - }); + }, agent); } else if (remaining.isEmpty()) { @@ -819,7 +810,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA } if (!remaining.isEmpty()) { - logger.error("Failed to close epoch {} for ranges {} on store {}. Retrying.", epoch, remaining, id, fail); + logger.warn("Failed to close epoch {} for ranges {} on store {}. Retrying.", epoch, remaining, id, fail); node.someExecutor().execute(() -> ensureReadyToCoordinate(epoch, remaining)); } } @@ -891,7 +882,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA } } - protected void updatedRedundantBefore(SafeCommandStore safeStore, RedundantBefore added) + protected void upsertedRedundantBefore(SafeCommandStore safeStore, RedundantBefore added) { TxnId clearWaitingBefore = redundantBefore.minShardAndLocallyAppliedBefore(); TxnId clearAllBefore = TxnId.min(clearWaitingBefore, durableBefore().min.quorumBefore); @@ -1076,7 +1067,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA SettableResult<Void> done = new SettableResult<>(); execute((Empty)() -> "Try Execute Listening", safeStore -> { tryExecuteListening(safeStore, listeners.txnsWaitingOn(SaveStatus.Applied).iterator(), done); - }); + }, agent); return done; } @@ -1096,7 +1087,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA { //noinspection DataFlowIssue safeStore = safeStore; - execute(context, safeStore0 -> tryExecuteListening(safeStore0, waitingOn, iterator, done)); + execute(context, safeStore0 -> { tryExecuteListening(safeStore0, waitingOn, iterator, done); }, agent); } else { @@ -1140,9 +1131,6 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA public final boolean isRejectedIfNotPreAccepted(TxnId txnId, Unseekables<?> participants) { - if (rejectBefore == null) - return false; - return rejectBefore.rejects(txnId, participants); } @@ -1267,7 +1255,7 @@ public abstract class CommandStore implements AbstractAsyncExecutor, SequentialA { Timestamp timestamp = Timestamp.fromValues(rangesForEpoch.epochs[rangesForEpoch.epochs.length - 1], minHlc, 0, node.id()); MaxConflicts updated = maxConflicts.update(rangesForEpoch.all(), timestamp); - setMaxConflicts(updated); + unsafeSetMaxConflicts(updated); } public static NavigableMap<TxnId, Ranges> emptyBootstrapBeganAt() diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 9a6e2d29..29bc2867 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -567,6 +567,7 @@ public abstract class CommandStores implements AsyncExecutorFactory protected void loadSnapshot(Snapshot toLoad) { + Invariants.require(!shuttingDown); current = toLoad; } @@ -657,6 +658,7 @@ public abstract class CommandStores implements AsyncExecutorFactory final ShardDistributor shardDistributor; final Journal journal; volatile Snapshot current; + boolean shuttingDown; int nextId; private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor, Journal journal) @@ -962,10 +964,13 @@ public abstract class CommandStores implements AsyncExecutorFactory for (Map.Entry<Integer, RangesForEpoch> e : update.commandStores.entrySet()) { Invariants.require(e.getValue() != null); - EpochUpdateHolder holder = new EpochUpdateHolder(); - holder.add(1, e.getValue(), e.getValue().all()); - shards[i++] = new ShardHolder(supplier.create(e.getKey(), holder), e.getValue()); + EpochUpdateHolder epochUpdates = new EpochUpdateHolder(); + ShardHolder shard = new ShardHolder(supplier.create(e.getKey(), epochUpdates), e.getValue()); + // TODO (required): if the add is necessary (highly unlikely) it needs to be done once journal is writeable so we NEED to move this + if (!shard.ranges.equals(shard.store.rangesForEpoch)) + epochUpdates.add(1, e.getValue(), e.getValue().all()); maxId = Math.max(maxId, e.getKey()); + shards[i++] = shard; } Arrays.sort(shards, Comparator.comparingInt(shard -> shard.store.id)); @@ -975,7 +980,6 @@ public abstract class CommandStores implements AsyncExecutorFactory public synchronized void resetTopology(Journal.TopologyUpdate update) { - // TODO: assert Snapshot current = this.current; Invariants.require(update.global.epoch() == current.local.epoch()); ShardHolder[] shards = new ShardHolder[current.commandStores.size()]; @@ -1025,6 +1029,9 @@ public abstract class CommandStores implements AsyncExecutorFactory public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology) { + if (shuttingDown) + throw new IllegalStateException("CommandStores are shutting down"); + TopologyUpdate update = updateTopology(node, current, newTopology); if (update.snapshot != current) { @@ -1044,8 +1051,14 @@ public abstract class CommandStores implements AsyncExecutorFactory return update.bootstrap; } + protected synchronized void markShuttingDown() + { + shuttingDown = true; + } + public void shutdown() { + markShuttingDown(); for (ShardHolder shard : current.shards) shard.store.shutdown(); } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 9e234578..e7be67e6 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -1128,10 +1128,8 @@ public class Commands return false; } - static int counter = 0; public static boolean maybeCleanup(SafeCommandStore safeStore, SafeCommand safeCommand, Command command, @Nonnull StoreParticipants newParticipants) { - ++counter; StoreParticipants cleanupParticipants = newParticipants.filter(LOAD, safeStore, command.txnId(), command.executeAtIfKnown()); Cleanup cleanup = shouldCleanup(FULL, safeStore, command, cleanupParticipants); if (cleanup == NO) @@ -1224,7 +1222,7 @@ public class Commands } else { - safeStore.commandStore().execute(this, this); + safeStore.commandStore().execute(this, this, safeStore.agent()); } } diff --git a/accord-core/src/main/java/accord/local/DurableBefore.java b/accord-core/src/main/java/accord/local/DurableBefore.java index 9e4368d9..e7856416 100644 --- a/accord-core/src/main/java/accord/local/DurableBefore.java +++ b/accord-core/src/main/java/accord/local/DurableBefore.java @@ -45,10 +45,7 @@ public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry> public static DurableBefore create(RoutingKey[] ends, Entry[] values) { if (values.length == 0) - { - Invariants.require(ends.length == 1 && ends[0] == null); return DurableBefore.EMPTY; - } return new DurableBefore(ends, values); } } @@ -171,6 +168,14 @@ public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry> return ReducingIntervalMap.merge(a, b, DurableBefore.Entry::max, Builder::new); } + public static DurableBefore mergeIfDifferent(DurableBefore prev, DurableBefore add) + { + DurableBefore next = DurableBefore.merge(prev, add); + if (next.equals(prev)) + return prev; + return next.equals(prev) ? prev : next; + } + public HasOutcome min(TxnId txnId, Unseekables<?> unseekables) { return notDurableIfNull(foldlWithDefault(unseekables, Entry::mergeMin, Entry.NONE, null, txnId, test -> test == None)); diff --git a/accord-core/src/main/java/accord/local/MaxConflicts.java b/accord-core/src/main/java/accord/local/MaxConflicts.java index 74ef5bb5..21dd9e05 100644 --- a/accord-core/src/main/java/accord/local/MaxConflicts.java +++ b/accord-core/src/main/java/accord/local/MaxConflicts.java @@ -51,9 +51,9 @@ public class MaxConflicts extends BTreeReducingRangeMap<Timestamp> return update(this, keysOrRanges, maxConflict, Timestamp::mergeMax, MaxConflicts::new, Builder::new); } - private static class Builder extends AbstractBoundariesBuilder<RoutingKey, Timestamp, MaxConflicts> + public static class Builder extends AbstractBoundariesBuilder<RoutingKey, Timestamp, MaxConflicts> { - protected Builder(int capacity) + public Builder(int capacity) { super(capacity); } diff --git a/accord-core/src/main/java/accord/local/MaxDecidedRX.java b/accord-core/src/main/java/accord/local/MaxDecidedRX.java index 1ff475c0..50bd3f3c 100644 --- a/accord-core/src/main/java/accord/local/MaxDecidedRX.java +++ b/accord-core/src/main/java/accord/local/MaxDecidedRX.java @@ -49,6 +49,8 @@ public class MaxDecidedRX extends ReducingRangeMap<MaxDecidedRX.DecidedRX> public DecidedRX(TxnId any, TxnId hlcBound) { + Invariants.nonNull(any); + Invariants.nonNull(hlcBound); this.any = any; this.hlcBound = hlcBound; } @@ -56,7 +58,7 @@ public class MaxDecidedRX extends ReducingRangeMap<MaxDecidedRX.DecidedRX> @Override public String toString() { - return "{any=" + any + ",hlcBound=" + hlcBound + "}"; + return "{any=" + any + ",hlcBound=" + hlcBound + '}'; } public boolean includeDecided(TxnId txnId) @@ -117,6 +119,31 @@ public class MaxDecidedRX extends ReducingRangeMap<MaxDecidedRX.DecidedRX> return b; return new DecidedRX(any, hlcBound); } + + @Override + public boolean equals(Object that) + { + return that instanceof DecidedRX && equals((DecidedRX) that); + } + + public boolean equals(DecidedRX that) + { + return this.any.equals(that.any) && this.hlcBound.equals(that.hlcBound); + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } + } + + public static final class SerializerSupport + { + public static MaxDecidedRX create(RoutingKey[] starts, DecidedRX[] values) + { + return new MaxDecidedRX(starts, values); + } } public static final MaxDecidedRX EMPTY = new MaxDecidedRX(); diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9c4f1620..6b843698 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -205,14 +205,10 @@ public class Node implements NodeCommandStoreService this.agent = agent; this.random = random; this.persistDurableBefore = new PersistentField<>(() -> durableBefore, - (input, prev) -> { - DurableBefore next = DurableBefore.merge(input, prev); - if (next.equals(prev)) - return prev; - return next.equals(prev) ? prev : next; - }, + DurableBefore::merge, DurableBefore::mergeIfDifferent, safeDurableBeforePersister(durableBeforePersister), - this::setPersistedDurableBefore); + this::setPersistedDurableBefore, + run -> someExecutor().execute(run)); this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), journal, shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); this.topology = new TopologyManager(topologySorter, this, topologyService, time, timeouts); this.durabilityService = new DurabilityService(this); @@ -319,7 +315,7 @@ public class Node implements NodeCommandStoreService public AsyncResult<?> markDurable(DurableBefore addDurableBefore) { - return withEpochExact(addDurableBefore.maxEpoch(), (AsyncExecutor)null, () -> persistDurableBefore.mergeAndUpdate(addDurableBefore).chain()) + return withEpochExact(addDurableBefore.maxEpoch(), (AsyncExecutor)null, () -> persistDurableBefore.save(addDurableBefore).chain()) .beginAsResult(); } diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 9ea4239e..76920d13 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -639,7 +639,7 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> return notRetired.without(Ranges.of(bounds.range)); } - RedundantStatus get(TxnId txnId, @Nullable Timestamp applyAtIfKnown) + public RedundantStatus get(TxnId txnId, @Nullable Timestamp applyAtIfKnown) { if (wasOwned(txnId)) { @@ -1277,17 +1277,26 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> @Override public String toString() { - return "gc:" + toString(GC_BEFORE) - + "\nlocal:" + toString(LOCALLY_DURABLE_TO_DATA_STORE, LOCALLY_DURABLE_TO_COMMAND_STORE) - + "\nbootstrap:" + toString(UNREADY); + return toString(", "); } - private String toString(Property p1) + public String toString(String delimiter) { - return toString(p1, null); + StringBuilder sb = new StringBuilder(); + append(sb, delimiter, "gc:", GC_BEFORE); + append(sb, delimiter, "applied:", LOCALLY_APPLIED); + append(sb, delimiter, "command_store:", LOCALLY_DURABLE_TO_COMMAND_STORE); + append(sb, delimiter, "data_store:", LOCALLY_DURABLE_TO_DATA_STORE); + append(sb, delimiter, "unready:", UNREADY); + return sb.toString(); } - private String toString(Property p1, Property p2) + private void append(StringBuilder builder, String delimiter, String prefix, Property p1) + { + append(builder, delimiter, prefix, p1, null); + } + + private void append(StringBuilder builder, String delimiter, String prefix, Property p1, Property p2) { TreeMap<TxnId, List<Range>> map = new TreeMap<>(); foldl((e, m, pp1, pp2) -> { @@ -1297,8 +1306,14 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> return m; }, map, p1, p2, i -> false); - return map.descendingMap().entrySet().stream() - .map(e -> (e.getKey().equals(TxnId.NONE) ? "none" : e.getKey().toString()) + ":" + Ranges.ofSorted(e.getValue().toArray(new Range[0])).mergeTouching()) - .collect(Collectors.joining(", ", "{", "}")); + if (map.size() == 0 || map.size() == 1 && map.firstKey().equals(TxnId.NONE)) + return; + + if (builder.length() > 0) + builder.append(delimiter); + builder.append(prefix); + builder.append(map.descendingMap().entrySet().stream() + .map(e -> (e.getKey().equals(TxnId.NONE) ? "none" : e.getKey().toString()) + ':' + Ranges.ofSorted(e.getValue().toArray(new Range[0])).mergeTouching()) + .collect(Collectors.joining(", ", "{", "}"))); } } diff --git a/accord-core/src/main/java/accord/local/RedundantStatus.java b/accord-core/src/main/java/accord/local/RedundantStatus.java index 4ac9c8e2..24d30268 100644 --- a/accord-core/src/main/java/accord/local/RedundantStatus.java +++ b/accord-core/src/main/java/accord/local/RedundantStatus.java @@ -109,13 +109,13 @@ public class RedundantStatus * We have applied the preceding transactions durably to the store, so that we can safely truncate the Write * information as we will not need to replay it to the store */ - LOCALLY_DURABLE_TO_DATA_STORE (false, false, LE, LOCALLY_APPLIED), + LOCALLY_DURABLE_TO_DATA_STORE (false, true, LE, LOCALLY_APPLIED), /** * We have applied the preceding transactions durably to all summary structures, so that on restart we do * not need to replay the transaction to restore any internal state. */ - LOCALLY_DURABLE_TO_COMMAND_STORE (false, false, LE, LOCALLY_APPLIED), + LOCALLY_DURABLE_TO_COMMAND_STORE (false, true, LE, LOCALLY_APPLIED), /** * We have fully executed until across all a majority of replicas for the range in question, diff --git a/accord-core/src/main/java/accord/local/RejectBefore.java b/accord-core/src/main/java/accord/local/RejectBefore.java index 5be27785..e969a147 100644 --- a/accord-core/src/main/java/accord/local/RejectBefore.java +++ b/accord-core/src/main/java/accord/local/RejectBefore.java @@ -31,6 +31,8 @@ import accord.utils.ReducingRangeMap; public class RejectBefore extends ReducingRangeMap<Timestamp> { + public static final RejectBefore EMPTY = new RejectBefore(); + public static class SerializerSupport { public static RejectBefore create(RoutingKey[] ends, Timestamp[] values) @@ -76,7 +78,7 @@ public class RejectBefore extends ReducingRangeMap<Timestamp> throw new IllegalArgumentException("value is null"); if (ranges.isEmpty()) - return new RejectBefore(); + return EMPTY; return create(ranges, value, Builder::new); } diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 3785913e..06296c65 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -21,9 +21,13 @@ package accord.local; import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; +import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.api.Agent; import accord.api.DataStore; import accord.api.LocalListeners; @@ -80,6 +84,8 @@ import static accord.utils.Invariants.illegalState; */ public abstract class SafeCommandStore implements RangesForEpochSupplier, RedundantBeforeSupplier, CommandSummaries { + private static final Logger logger = LoggerFactory.getLogger(SafeCommandStore.class); + private static final int MAX_REENTRANCY = 50; private int reentrancyCounter; public boolean tryRecurse() @@ -293,20 +299,13 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund if (!updated.txnId().isSyncPoint() || updated.txnId().domain() != Range) return; if (updated.route() == null) return; - List<SyncPointListener> listeners = commandStore().syncPointListeners; - if (listeners != null) - { - for (SyncPointListener listener : listeners) - listener.update(this, updated); - } - SaveStatus prevSaveStatus = prev == null ? SaveStatus.Uninitialised : prev.saveStatus(); SaveStatus newSaveStatus = updated.saveStatus(); if (newSaveStatus.known.isDefinitionKnown() && (force || !prevSaveStatus.known.isDefinitionKnown())) { Ranges ranges = updated.participants().touches().toRanges(); - commandStore().markExclusiveSyncPoint(this, updated.txnId(), ranges); + commandStore().upsertRejectBefore(this, updated.txnId(), ranges); } if (newSaveStatus.compareTo(Committed) >= 0 && newSaveStatus.compareTo(TruncatedApply) <= 0 && (force || prevSaveStatus.compareTo(Committed) < 0)) @@ -343,6 +342,15 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund if (addRedundantBefore != RedundantBefore.EMPTY) upsertRedundantBefore(addRedundantBefore); } + + // invoke listeners only after updating redundantBefore + List<SyncPointListener> listeners = commandStore().syncPointListeners; + if (listeners != null) + { + logger.debug("Notifying SyncPoint listeners"); + for (SyncPointListener listener : listeners) + listener.update(this, updated); + } } public void updateMaxConflicts(Command prev, Command updated, boolean force) @@ -377,6 +385,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund // updateUnmanagedCommandsForKey(this, next, REGISTER_DEPS_ONLY); } + // TODO (expected): should these and related methods live in CommandStore for consistency? private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Command prev, Command next, boolean forceNotify) { StoreParticipants participants = next.participants().supplement(prev.participants()); @@ -523,8 +532,8 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund AsyncChains.chain(() -> commandStore.markingVisible(syncId, waitingOn)) .flatMap(ignore -> AsyncChains.reduce(async, Reduce.toNull(), null)) .begin((success, fail) -> { - if (fail == null) commandStore.execute((PreLoadContext.Empty)() -> "Mark Synced", safeStore0 -> commandStore.markVisible(safeStore0, syncId, waitingOn)); - else commandStore.execute((PreLoadContext.Empty)() -> "Unmark Syncing", safeStore0 -> commandStore.cancelMarkingVisible(syncId, waitingOn)); + if (fail == null) commandStore.execute((PreLoadContext.Empty)() -> "Mark Synced", (Consumer<? super SafeCommandStore>) safeStore0 -> commandStore.markVisible(safeStore0, syncId, waitingOn), commandStore.agent()); + else commandStore.execute((PreLoadContext.Empty)() -> "Unmark Syncing", (Consumer<? super SafeCommandStore>) safeStore0 -> commandStore.cancelMarkingVisible(syncId, waitingOn), commandStore.agent); }); } @@ -561,7 +570,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund protected void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore) { commandStore().unsafeUpsertRedundantBefore(addRedundantBefore); - commandStore().updatedRedundantBefore(this, addRedundantBefore); + commandStore().upsertedRedundantBefore(this, addRedundantBefore); } public void setBootstrapBeganAt(NavigableMap<TxnId, Ranges> newBootstrapBeganAt) diff --git a/accord-core/src/main/java/accord/messages/SetShardDurable.java b/accord-core/src/main/java/accord/messages/SetShardDurable.java index 3edc04b6..557fa76e 100644 --- a/accord-core/src/main/java/accord/messages/SetShardDurable.java +++ b/accord-core/src/main/java/accord/messages/SetShardDurable.java @@ -56,6 +56,7 @@ public class SetShardDurable extends NoWaitRequest<Route<?>, SimpleReply> { Invariants.require(durability.compareTo(Quorum) >= 0); TxnId syncIdWithFlags = syncIdWithFlags(); + // TODO (required): does this need to strictly precede updating RedundantBefore? Because updating the global map is more expensive. node.markDurable(syncPoint.route.toRanges(), syncIdWithFlags, durability.compareTo(Universal) >= 0 ? syncIdWithFlags : TxnId.NONE) .invoke((success, fail) -> { if (fail != null) node.reply(replyTo, replyContext, null, fail); diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java index 5fd98234..3f01fefa 100644 --- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java +++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java @@ -908,7 +908,7 @@ public abstract class AbstractRanges implements Iterable<Range>, Routables<Range Invariants.requireArgument(ranges.length == 0 || ranges[0] != null); for (int i = 1 ; i < ranges.length ; ++i) { - if (ranges[i - 1].end().compareTo(ranges[i].start()) > 0) + if (ranges[i] == null || ranges[i - 1].end().compareTo(ranges[i].start()) > 0) throw illegalArgument(Arrays.toString(ranges) + " is not correctly sorted or deoverlapped"); } diff --git a/accord-core/src/main/java/accord/primitives/Timestamp.java b/accord-core/src/main/java/accord/primitives/Timestamp.java index c5c6c9ca..841c4f38 100644 --- a/accord-core/src/main/java/accord/primitives/Timestamp.java +++ b/accord-core/src/main/java/accord/primitives/Timestamp.java @@ -96,8 +96,6 @@ public class Timestamp implements Comparable<Timestamp>, EpochSupplier /** * The set of flags we want to retain as we merge timestamps (e.g. when taking mergeMax). - * Today this is only the REJECTED_FLAG, but we may include additional flags in future (such as Committed, Applied..) - * which we may also want to retain when merging in other contexts (such as in Deps). */ static final int MERGE_FLAGS = REJECTED.bit | UNSTABLE.bit | HLC_BOUND.bit | SHARD_BOUND.bit; public static final long IDENTITY_LSB = 0xFFFFFFFF_FFFF00FFL; diff --git a/accord-core/src/main/java/accord/primitives/TxnId.java b/accord-core/src/main/java/accord/primitives/TxnId.java index 655267ed..994f9491 100644 --- a/accord-core/src/main/java/accord/primitives/TxnId.java +++ b/accord-core/src/main/java/accord/primitives/TxnId.java @@ -23,6 +23,7 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; +import accord.local.Node; import accord.local.Node.Id; import accord.primitives.Routable.Domain; import accord.primitives.Txn.Kind; diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index c70d7ddc..f57f339e 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -422,7 +422,7 @@ public class Topology for (int i = shards.firstSetBit() ; i >= 0 ; i = shards.nextSetBit(i + 1, -1)) { supersetIndexes[count] = this.supersetIndexes[i]; - ranges[count] = this.shards[this.supersetIndexes[i]].range; + ranges[count++] = this.shards[this.supersetIndexes[i]].range; } subsetOfRanges = Ranges.ofSortedAndDeoverlapped(ranges); } diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java b/accord-core/src/main/java/accord/utils/PersistentField.java index cc63d899..79034fd9 100644 --- a/accord-core/src/main/java/accord/utils/PersistentField.java +++ b/accord-core/src/main/java/accord/utils/PersistentField.java @@ -19,13 +19,18 @@ package accord.utils; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.TreeSet; +import java.util.concurrent.Executor; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; @@ -40,41 +45,57 @@ public class PersistentField<Input, Saved> Saved load(); } - private static class Pending<Saved> + private static class PendingInput<Input> + { + final AsyncResult.Settable<Void> done = AsyncResults.settable(); + final Input input; + + private PendingInput(Input input) + { + this.input = input; + } + } + + private static class PendingSave<Saved> { final int id; final Saved saving; - private Pending(int id, Saved saving) + private PendingSave(int id, Saved saving) { this.id = id; this.saving = saving; } } - @Nonnull private final Supplier<Saved> currentValue; - @Nonnull - private final BiFunction<Input, Saved, Saved> merge; - @Nonnull + private final BiFunction<Input, Input, Input> mergeInputs; + private final BiFunction<Input, Saved, Saved> mergeToSave; private final Persister<Input, Saved> persister; - @Nonnull private final Consumer<Saved> set; + private final Executor mergeExecutor; - private Saved latestPending; + private Saved latestSave; private int nextId; - private final ArrayDeque<Pending<Saved>> pending = new ArrayDeque<>(); + private final List<PendingInput<Input>> inputBuffer = new ArrayList<>(); + private final ArrayDeque<PendingInput<Input>> inputs = new ArrayDeque<>(); + private final ArrayDeque<PendingSave<Saved>> saves = new ArrayDeque<>(); private final TreeSet<Integer> complete = new TreeSet<>(); + private final Lock mergeLock = new ReentrantLock(); - public PersistentField(@Nonnull Supplier<Saved> currentValue, @Nonnull BiFunction<Input, Saved, Saved> merge, @Nonnull Persister<Input, Saved> persister, @Nullable Consumer<Saved> set) + public PersistentField(@Nonnull Supplier<Saved> currentValue, @Nonnull BiFunction<Input, Input, Input> mergeInputs, @Nonnull BiFunction<Input, Saved, Saved> mergeToSave, @Nonnull Persister<Input, Saved> persister, Consumer<Saved> set, Executor mergeExecutor) { Invariants.nonNull(currentValue, "currentValue cannot be null"); + Invariants.nonNull(mergeInputs, "mergeInputs cannot be null"); + Invariants.nonNull(mergeToSave, "mergeToSave cannot be null"); Invariants.nonNull(persister, "persist cannot be null"); Invariants.nonNull(set, "set cannot be null"); this.currentValue = currentValue; - this.merge = merge; + this.mergeInputs = mergeInputs; + this.mergeToSave = mergeToSave; this.persister = persister; this.set = set; + this.mergeExecutor = mergeExecutor; } public void load() @@ -82,30 +103,79 @@ public class PersistentField<Input, Saved> set.accept(persister.load()); } - public synchronized AsyncResult<?> mergeAndUpdate(@Nonnull Input inputValue) + public AsyncResult<?> save(@Nonnull Input inputValue) { - Invariants.nonNull(merge, "merge cannot be null"); Invariants.nonNull(inputValue, "inputValue cannot be null"); - return mergeAndUpdate(inputValue, merge); + PendingInput<Input> submit = new PendingInput<>(inputValue); + synchronized (this) + { + inputs.add(submit); + } + trySave(); + return submit.done; + } + + private void trySave() + { + if (mergeLock.tryLock()) + { + try { save(); } + finally { mergeLock.unlock(); } + + synchronized (this) + { + if (!inputs.isEmpty()) + mergeExecutor.execute(this::trySave); + } + } } - private AsyncResult<?> mergeAndUpdate(@Nullable Input inputValue, @Nonnull BiFunction<Input, Saved, Saved> merge) + @GuardedBy("mergeLock") + private void save() { - Invariants.nonNull(merge, "merge cannot be null"); - Saved startingValue = latestPending; - if (startingValue == null) + Saved startingValue; + synchronized (this) { - Invariants.require(pending.isEmpty()); - startingValue = currentValue.get(); + if (inputs.isEmpty()) + return; + + inputBuffer.clear(); + inputBuffer.addAll(inputs); + inputs.clear(); + + startingValue = latestSave; + if (startingValue == null) + { + Invariants.require(saves.isEmpty()); + startingValue = currentValue.get(); + } } - Saved newValue = merge.apply(inputValue, startingValue); + + Input inputValue = inputBuffer.get(0).input; + for (int i = 1; i < inputBuffer.size() ; ++i) + inputValue = mergeInputs.apply(inputValue, inputBuffer.get(i).input); + + Saved newValue = mergeToSave.apply(inputValue, startingValue); if (newValue == startingValue) - return AsyncResults.success(null); - this.latestPending = newValue; - int id = ++nextId; - pending.add(new Pending<>(id, newValue)); + { + inputBuffer.forEach(i -> i.done.setSuccess(null)); + inputBuffer.clear(); + return; + } + + final List<AsyncResult.Settable<Void>> notifyOnDone = new ArrayList<>(inputBuffer.size()); + for (PendingInput<?> pending : inputBuffer) + notifyOnDone.add(pending.done); + inputBuffer.clear(); + + int id; + synchronized (this) + { + this.latestSave = newValue; + id = ++nextId; + saves.add(new PendingSave<>(id, newValue)); + } - AsyncResult.Settable<Void> result = AsyncResults.settable(); AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue); pendingWrite.invoke((success, fail) -> { synchronized (this) @@ -113,17 +183,15 @@ public class PersistentField<Input, Saved> complete.add(id); boolean upd = false; Saved latest = null; - while (!complete.isEmpty() && pending.peek().id == complete.first()) + while (!complete.isEmpty() && saves.peek().id == complete.first()) { - latest = pending.poll().saving; + latest = saves.poll().saving; complete.pollFirst(); upd = true; } if (upd) set.accept(latest); - result.setSuccess(null); + notifyOnDone.forEach(i -> i.setSuccess(null)); } }); - - return result; } } diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java index ee0f6f79..972df635 100644 --- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java @@ -21,6 +21,7 @@ import accord.api.RoutingKey; import accord.primitives.*; import java.util.Arrays; +import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.IntFunction; @@ -494,9 +495,47 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> public <V2> ReducingRangeMap<V2> map(Function<V, V2> map, IntFunction<V2[]> allocator) { + RoutingKey[] starts = null; V2[] output = allocator.apply(values.length); + int count = 0; for (int i = 0 ; i < values.length ; ++i) - output[i] = map.apply(values[i]); + { + V2 next = map.apply(values[i]); + if (count == 0 ? next == null : (Objects.equals(next, output[i - 1]))) + { + if (starts == null) + { + starts = new RoutingKey[values.length]; + System.arraycopy(this.starts, 0, starts, 0, count); + } + continue; + } + if (starts != null) + starts[count] = this.starts[i]; + output[count++] = next; + } + + if (count > 0) + { + if (starts != null) + { + starts[count] = this.starts[this.starts.length - 1]; + if (output[count - 1] == null) + --count; + + starts = Arrays.copyOf(starts, count + 1); + output = Arrays.copyOf(output, count); + } + else + { + Invariants.require(count == values.length); + starts = this.starts; + } + } + + if (count == 0) + return new ReducingRangeMap<>(); + return new ReducingRangeMap<>(starts, output); } } diff --git a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java index aeea27c7..c7ca2a60 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncCallbacks.java @@ -30,6 +30,7 @@ public class AsyncCallbacks // a runnable interface that may be directly failed public interface RunOrFail extends Runnable { + // run should not throw any exception void run(); void fail(Throwable fail); } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index b1ad69ed..2f72ef1f 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -790,7 +790,7 @@ public class Cluster listStore.restore(); for (CommandStore store : stores.all()) ((ListAgent) store.agent()).restore((InMemoryCommandStore) store); - journal.replay(stores); + journal.replay(stores, null); Catchup.catchup(node); // Re-enable safety checks diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index 2a2a7e97..22a29e5b 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -91,11 +91,11 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { Snapshot current = current(); RangesForEpoch ranges = e.getValue(); - CommandStore commandStore = null; + DelayedCommandStore commandStore = null; for (ShardHolder shard : current) { if (shard.ranges().equals(ranges)) - commandStore = shard.store; + commandStore = (DelayedCommandStore) shard.store; } Invariants.nonNull(commandStore, "Each set of ranges should have a corresponding command store, but %d did not:(%s)", ranges, Arrays.toString(shards)) diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index f079237a..c838c891 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -127,6 +127,11 @@ public class InMemoryJournal implements Journal this.partialCompactionChance = 1f - (random.nextFloat()/2); } + @Override + public void open(Node node) + { + } + public void start(Node node) { this.node = node; @@ -617,7 +622,7 @@ public class InMemoryJournal implements Journal } @Override - public boolean replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores, Object param) { for (Map.Entry<Integer, NavigableMap<TxnId, Diffs>> diffEntry : diffsPerCommandStore.entrySet()) { diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index 00f675fc..b55c6c2a 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -75,7 +75,12 @@ public class LoggingJournal implements Journal } @Override + public void open(Node node) + { + delegate.open(node); + } + @Override public void start(Node node) { delegate.start(node); @@ -130,9 +135,9 @@ public class LoggingJournal implements Journal } @Override - public boolean replay(CommandStores commandStores) + public boolean replay(CommandStores commandStores, Object param) { - return delegate.replay(commandStores); + return delegate.replay(commandStores, null); } @Override diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 9578254b..7e58289e 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -388,6 +388,7 @@ public class Cluster implements Scheduler public static class NoOpJournal implements Journal { + @Override public void open(Node node) { } @Override public void start(Node node) { } @Override public Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @Override public Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } @@ -396,7 +397,7 @@ public class Cluster implements Scheduler @Override public List<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } @Override public void purge(CommandStores commandStores, EpochSupplier minEpoch) { throw new IllegalStateException("Not impelemented"); } - @Override public boolean replay(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } + @Override public boolean replay(CommandStores commandStores, Object param) { throw new IllegalStateException("Not impelemented"); } @Override public RedundantBefore loadRedundantBefore(int store) { throw new IllegalStateException("Not impelemented"); } @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store) { throw new IllegalStateException("Not impelemented"); } @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int store) { throw new IllegalStateException("Not impelemented"); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
