This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new 05b2f241 Fix CFK restore after replay: - Remove from CFK any unapplied transactions we know cannot apply - Force notification of waiting commands in CFK on replay Also fix: - Don't truncateWithOutcome if pre bootstrap or stale - Fix RangeDeps.without(RangeDeps) - Fix InMemoryCommandStore replay bug with clearing DefaultLocalListeners - Ensure SaveStatus and executeAt are updated together to prevent corruption via expunge Improve: - Inform home shard that command is decided [...] 05b2f241 is described below commit 05b2f2415c3e7c109158208824fbc804dfe43f71 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Fri Jul 25 10:51:47 2025 +0100 Fix CFK restore after replay: - Remove from CFK any unapplied transactions we know cannot apply - Force notification of waiting commands in CFK on replay Also fix: - Don't truncateWithOutcome if pre bootstrap or stale - Fix RangeDeps.without(RangeDeps) - Fix InMemoryCommandStore replay bug with clearing DefaultLocalListeners - Ensure SaveStatus and executeAt are updated together to prevent corruption via expunge Improve: - Inform home shard that command is decided if we cannot execute, to avoid recovery contention - Don't recover sync points on the fast path - Don't calculate recovery info for RX (since we don't use it anymore, so no need to do the work) patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20797 --- .../src/main/java/accord/api/ProgressLog.java | 6 ++ .../main/java/accord/api/ProtocolModifiers.java | 2 - .../accord/coordinate/ExecuteEphemeralRead.java | 5 +- .../main/java/accord/coordinate/ExecuteTxn.java | 70 ++++++++++++++-- .../java/accord/coordinate/ReadCoordinator.java | 14 +++- .../src/main/java/accord/coordinate/Recover.java | 1 + .../coordinate/tracking/QuorumIdTracker.java | 11 ++- .../java/accord/impl/AbstractFetchCoordinator.java | 9 ++- .../src/main/java/accord/impl/CommandChange.java | 9 +++ .../java/accord/impl/DefaultLocalListeners.java | 12 ++- .../accord/impl/progresslog/CoordinatePhase.java | 5 ++ .../impl/progresslog/DefaultProgressLog.java | 56 ++++++++----- .../java/accord/impl/progresslog/HomeState.java | 2 +- .../src/main/java/accord/local/Cleanup.java | 12 ++- .../src/main/java/accord/local/Commands.java | 8 +- .../main/java/accord/local/RedundantBefore.java | 26 ++++-- .../src/main/java/accord/local/SafeCommand.java | 4 +- .../main/java/accord/local/SafeCommandStore.java | 12 +-- .../main/java/accord/local/cfk/CommandsForKey.java | 29 +++++-- .../accord/local/cfk/CommandsForKeyUpdate.java | 6 +- .../src/main/java/accord/local/cfk/NotifySink.java | 2 +- .../main/java/accord/local/cfk/PostProcess.java | 4 +- .../src/main/java/accord/local/cfk/Pruning.java | 59 +++++++++++++- .../java/accord/local/cfk/SafeCommandsForKey.java | 30 +++---- .../src/main/java/accord/local/cfk/Updating.java | 22 ++--- .../src/main/java/accord/local/cfk/Utils.java | 37 ++++++++- .../accord/local/durability/ShardDurability.java | 4 +- .../src/main/java/accord/messages/Accept.java | 2 +- .../accord/messages/ApplyThenWaitUntilApplied.java | 9 --- .../main/java/accord/messages/BeginRecovery.java | 6 +- .../main/java/accord/messages/InformDecided.java | 94 ++++++++++++++++++++++ .../main/java/accord/messages/InformDurable.java | 35 +++++--- .../src/main/java/accord/messages/MessageType.java | 1 + .../src/main/java/accord/messages/ReadData.java | 3 +- .../src/main/java/accord/primitives/RangeDeps.java | 45 ++++++++--- .../main/java/accord/utils/RelationMultiMap.java | 13 +++ .../src/test/java/accord/impl/basic/Cluster.java | 8 +- .../src/test/java/accord/impl/list/ListAgent.java | 6 +- .../java/accord/local/cfk/CommandsForKeyTest.java | 6 +- 39 files changed, 515 insertions(+), 170 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java b/accord-core/src/main/java/accord/api/ProgressLog.java index 978315ed..3b4d8764 100644 --- a/accord-core/src/main/java/accord/api/ProgressLog.java +++ b/accord-core/src/main/java/accord/api/ProgressLog.java @@ -181,6 +181,11 @@ public interface ProgressLog */ void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force); + /** + * Record a remote notification that the command has been decided, so does not need to be recovered until ready to execute. + */ + void decided(SafeCommandStore safeStore, TxnId txnId); + /** * Process a remote asynchronous callback. */ @@ -237,6 +242,7 @@ public interface ProgressLog class NoOpProgressLog implements ProgressLog { @Override public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force) {} + @Override public void decided(SafeCommandStore safeStore, TxnId txnId) {} @Override public void remoteCallback(SafeCommandStore safeStore, SafeCommand safeCommand, SaveStatus remoteStatus, int callbackId, Node.Id from) {} @Override public void waiting(BlockedUntil blockedUntil, SafeCommandStore safeStore, SafeCommand blockedBy, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants, StoreParticipants participants) {} @Override public void invalidIfUncommitted(TxnId txnId) {} diff --git a/accord-core/src/main/java/accord/api/ProtocolModifiers.java b/accord-core/src/main/java/accord/api/ProtocolModifiers.java index 7cc815fc..92f61c4e 100644 --- a/accord-core/src/main/java/accord/api/ProtocolModifiers.java +++ b/accord-core/src/main/java/accord/api/ProtocolModifiers.java @@ -195,8 +195,6 @@ public class ProtocolModifiers public static class Toggles { - public static final boolean artificiallyConstrainResources = Invariants.debug(); - private static FastPaths permittedFastPaths = new FastPaths(FastPath.values()); public static boolean usePrivilegedCoordinator() { return permittedFastPaths.hasPrivilegedCoordinator(); } public static void setPermittedFastPaths(FastPaths newPermittedFastPaths) { permittedFastPaths = newPermittedFastPaths; } diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java index d42c548f..dfd39c69 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteEphemeralRead.java @@ -223,14 +223,13 @@ public class ExecuteEphemeralRead extends ReadCoordinator<ReadReply> } @Override - protected boolean cancel() + public void timeout() { if (!super.cancel()) - return false; + return; // TODO (desired): if we fail to commit locally we can submit a slow/medium path request callback.failure(node.id(), new Timeout(txnId, route.homeKey(), "Could not promptly read from local coordinator")); - return true; } @Override diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java index fe722a6d..dec14bd0 100644 --- a/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java +++ b/accord-core/src/main/java/accord/coordinate/ExecuteTxn.java @@ -25,6 +25,8 @@ import accord.api.Result; import accord.api.Timeouts; import accord.coordinate.ExecuteFlag.CoordinationFlags; import accord.coordinate.ExecuteFlag.ExecuteFlags; +import accord.coordinate.tracking.QuorumIdTracker; +import accord.coordinate.tracking.RequestStatus; import accord.local.Commands; import accord.local.Commands.CommitOutcome; import accord.local.Node; @@ -35,6 +37,7 @@ import accord.local.SequentialAsyncExecutor; import accord.local.StoreParticipants; import accord.messages.Accept; import accord.messages.Commit; +import accord.messages.InformDecided; import accord.messages.MessageType; import accord.messages.ReadData; import accord.messages.ReadData.CommitOrReadNack; @@ -90,10 +93,14 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> final Topologies allTopologies; final CoordinationFlags flags; final BiConsumer<? super Result, Throwable> callback; + private final QuorumIdTracker stable; private final Participants<?> readScope; + private final boolean sendInitialStable; private Data data; private long uniqueHlc; + private boolean isPrivilegedVoteCommitting; + private boolean hasInformedDecidedOrSucceeded; ExecuteTxn(Node node, SequentialAsyncExecutor executor, Topologies topologies, FullRoute<?> route, Ballot ballot, ExecutePath path, CoordinationFlags flags, TxnId txnId, Txn txn, Timestamp executeAt, Deps stableDeps, Deps sendDeps, BiConsumer<? super Result, Throwable> callback) { @@ -108,7 +115,9 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> this.sendDeps = sendDeps; this.flags = flags; this.callback = callback; + this.stable = new QuorumIdTracker(topologies); this.readScope = txn == null ? route : route.intersecting(txn.keys()); + this.sendInitialStable = sendOnlyReadStableMessages() && path != RECOVER; Invariants.require(!txnId.awaitsOnlyDeps()); Invariants.require(!txnId.awaitsPreviouslyOwned()); } @@ -120,6 +129,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> Node.Id self = node.id(); if (permitLocalExecution() && tryIfUniversal(self)) { + isPrivilegedVoteCommitting = true; new LocalExecute(txnId, flags.get(self)).process(node, node.agent().selfExpiresAt(txnId, Execute, MICROSECONDS)); } else if (path == FAST && txnId.hasPrivilegedCoordinator()) @@ -142,7 +152,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> IntHashSet readSet = new IntHashSet(); to.forEach(i -> readSet.add(i.id)); // TODO (desired): if READY_TO_EXECUTE send a simple read (skip setting Stable) - Commit.stableAndRead(node, executor, allTopologies, commitKind(), txnId, txn, route, readScope, executeAt, sendDeps, readSet, flags, sendOnlyReadStableMessages() && path != RECOVER, this); + Commit.stableAndRead(node, executor, allTopologies, commitKind(), txnId, txn, route, readScope, executeAt, sendDeps, readSet, flags, sendInitialStable, this); } private Commit.Kind commitKind() @@ -161,8 +171,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> public void contact(Id to) { ExecuteFlags flags = this.flags.get(to); - boolean alreadySentStable = !(sendOnlyReadStableMessages() && path != RECOVER); - Request request = Commit.requestTo(to, true, allTopologies, commitKind(), Ballot.ZERO, txnId, txn, route, readScope, executeAt, sendDeps, flags, alreadySentStable, false); + Request request = Commit.requestTo(to, true, allTopologies, commitKind(), Ballot.ZERO, txnId, txn, route, readScope, executeAt, sendDeps, flags, sendInitialStable, false); // we are always sending to a replica in the latest epoch and requesting a read, so onlyContactOldAndReadSet is a redundant parameter node.send(to, request, executor, this); } @@ -173,11 +182,22 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> return ((ReadOk)reply).unavailable; } + @Override + protected void onSuccessAfterDone(Id from, ReadReply reply) + { + if (!hasInformedDecidedOrSucceeded && (reply.isOk() || reply == Waiting)) + { + if (RequestStatus.Success == stable.recordSuccess(from)) + informDecided(); + } + } + @Override protected Action process(Id from, ReadReply reply) { if (reply.isOk()) { + stable.recordSuccess(from); ReadOk ok = ((ReadOk) reply); Data next = ok.data; if (next != null) @@ -196,6 +216,9 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> { default: throw new UnhandledEnum(nack); case Waiting: + if (from.id == node.id().id) + isPrivilegedVoteCommitting = false; + stable.recordSuccess(from); return Action.None; case Redundant: @@ -217,6 +240,7 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> // TODO (expected): if we fail on the fast path and we haven't sent any Stable messages, we should send them now to make recovery easier if (failure == null) { + hasInformedDecidedOrSucceeded = true; Timestamp executeAt = this.executeAt; if (txnId.is(Txn.Kind.Write) && uniqueHlc != 0) { @@ -233,10 +257,37 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> } else { + if (!hasInformedDecidedOrSucceeded && stable.hasReachedQuorum()) + informDecided(); callback.accept(null, failure); } } + @Override + public void onSlowResponse(Id from) + { + // send stable messages to everyone not yet contacted, and then inform decided, to avoid unnecessary recoveries + if (!hasInformedDecidedOrSucceeded && stable.hasReachedQuorum()) + informDecided(); + super.onSlowResponse(from); + } + + @Override + public void onFailure(Id from, Throwable failure) + { + super.onFailure(from, failure); + if (isPrivilegedVoteCommitting && from.id == node.id().id) + tryFinishOnFailure(); + } + + private void informDecided() + { + Invariants.require(stable.hasReachedQuorum()); + hasInformedDecidedOrSucceeded = true; + InformDecided.informHome(node, topologies, txnId, route); + } + + protected CoordinationAdapter<Result> adapter() { return node.coordinationAdapter(txnId, Standard); @@ -323,14 +374,17 @@ public class ExecuteTxn extends ReadCoordinator<ReadReply> } @Override - protected boolean cancel() + public void timeout() { if (!super.cancel()) - return false; + return; - // TODO (desired): if we fail to commit locally we can submit a slow/medium path request - callback.failure(node.id(), new Timeout(txnId, route.homeKey(), "Could not promptly " + (committed ? "commit to" : "read from") + " local coordinator")); - return true; + if (committed) reply(null, new Timeout(txnId, route.homeKey(), "Could not promptly read from local coordinator")); + else + { + // TODO (desired): if we fail to commit locally we can submit a slow/medium path request + callback.failure(node.id(), new Timeout(txnId, route.homeKey(), "Could not promptly commit to local coordinator")); + } } @Override diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java index 629f5041..ade7351e 100644 --- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java @@ -122,6 +122,8 @@ public abstract class ReadCoordinator<Reply extends accord.messages.Reply> exten // TODO (desired): this isn't very clean way of integrating these responses protected Ranges unavailable(Reply reply) { throw new UnsupportedOperationException(); } + protected void onSuccessAfterDone(Id from, Reply reply) {} + @Override public void onSuccess(Id from, Reply reply) { @@ -129,7 +131,10 @@ public abstract class ReadCoordinator<Reply extends accord.messages.Reply> exten debug.merge(from, reply, (a, b) -> a instanceof List<?> ? ((List<Object>) a).add(b) : Lists.newArrayList(a, b)); if (isDone) + { + onSuccessAfterDone(from, reply); return; + } Action action = process(from, reply); switch (action) @@ -182,8 +187,7 @@ public abstract class ReadCoordinator<Reply extends accord.messages.Reply> exten if (this.failure == null) this.failure = failure; else this.failure.addSuppressed(failure); - if (txnId.hasPrivilegedCoordinator() && from.id == node.id().id) finishOnFailure(); - else handle(recordFailure(from)); + handle(recordFailure(from)); } @Override @@ -215,6 +219,12 @@ public abstract class ReadCoordinator<Reply extends accord.messages.Reply> exten finishOnFailure(); } + protected void tryFinishOnFailure() + { + if (!isDone) + finishOnFailure(); + } + protected void finishOnFailure() { Invariants.require(!isDone); diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java index 0ea7d252..4783f8fa 100644 --- a/accord-core/src/main/java/accord/coordinate/Recover.java +++ b/accord-core/src/main/java/accord/coordinate/Recover.java @@ -399,6 +399,7 @@ public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throw .intersecting(route, id -> !recoverOks.containsKey(id.node)); InferredFastPath fastPath; if (txnId.hasPrivilegedCoordinator() && coordinatorInRecoveryQuorum) fastPath = Reject; + else if (txnId.is(Txn.Kind.ExclusiveSyncPoint)) fastPath = Reject; else fastPath = merge( supersedingRejects(recoverOkList) ? Reject : Unknown, tracker.inferFastPathDecision(txnId, extraCoordVotes, extraRejects) diff --git a/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java index e846f8cd..e9120202 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/QuorumIdTracker.java @@ -18,12 +18,10 @@ package accord.coordinate.tracking; -import java.util.Set; - import accord.local.Node; import accord.topology.Shard; import accord.topology.Topologies; -import org.agrona.collections.ObjectHashSet; +import accord.utils.SortedListSet; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange; @@ -33,12 +31,13 @@ public class QuorumIdTracker extends SimpleTracker<QuorumIdTracker.QuorumIdShard { public static class QuorumIdShardTracker extends ShardTracker { - protected final Set<Node.Id> successes = new ObjectHashSet<>(); - protected Set<Node.Id> failures; + protected final SortedListSet<Node.Id> successes; + protected SortedListSet<Node.Id> failures; public QuorumIdShardTracker(Shard shard) { super(shard); + successes = SortedListSet.noneOf(shard.nodes); } public ShardOutcomes onSuccess(Node.Id from) @@ -50,7 +49,7 @@ public class QuorumIdTracker extends SimpleTracker<QuorumIdTracker.QuorumIdShard public ShardOutcomes onFailure(Node.Id from) { if (failures == null) - failures = new ObjectHashSet<>(); + failures = SortedListSet.noneOf(shard.nodes); return failures.add(from) && failures.size() == 1 + shard.maxFailures ? Fail : NoChange; } diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java index 3063e468..fa3d3310 100644 --- a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java +++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java @@ -60,6 +60,7 @@ import javax.annotation.Nullable; import static accord.messages.MessageType.StandardMessage.FETCH_DATA_REQ; import static accord.messages.MessageType.StandardMessage.FETCH_DATA_RSP; import static accord.messages.ReadData.CommitOrReadNack.Redundant; +import static accord.messages.ReadData.CommitOrReadNack.Waiting; import static accord.messages.ReadEphemeralTxnData.retryInLaterEpoch; import static accord.primitives.SaveStatus.Applied; import static accord.primitives.SaveStatus.TruncatedApply; @@ -154,15 +155,17 @@ public abstract class AbstractFetchCoordinator extends FetchCoordinator { CoordinateSyncPoint.sendApply(node, from, syncPoint); } - else + else if (reply == Redundant) { fail(to, new RuntimeException(reply.toString())); inflight.remove(key).cancel(); - if (reply != Redundant) - throw new UnhandledEnum((CommitOrReadNack)reply); // too late, sync point has been erased // TODO (desired): stop fetch sync points from garbage collecting too quickly } + else if (reply != Waiting) + { + throw new UnhandledEnum((CommitOrReadNack)reply); + } return; } diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index 0d3cc73f..e0896b39 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -806,6 +806,15 @@ public class CommandChange } // make sure we have enough information to decide whether to expunge timestamps (for unique ApplyAt HLC guarantees) + if (isChanged(SAVE_STATUS, flags) || isChanged(EXECUTE_AT, flags)) + { + // to ensure we don't allow a later SaveStatus to be combined with a stale executeAt + // (which can be caused by expunging, and can briefly prevent further expunging if it was a higher timestamp), + // for now we ensure we always save the two together if either changes + flags = setChanged(SAVE_STATUS, flags); + if (after.executeAt() != null) flags = setChanged(EXECUTE_AT, flags); + else flags = setIsNullAndChanged(EXECUTE_AT, flags); + } if (saveStatus.known.is(ApplyAtKnown) && (before == null || !before.saveStatus().known.is(ApplyAtKnown))) { flags = setChanged(EXECUTE_AT, flags); diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java index cacc703c..ad166bed 100644 --- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java +++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java @@ -61,7 +61,7 @@ public class DefaultLocalListeners implements LocalListeners } @Override - public LocalListeners create(CommandStore store) + public LocalListeners create(CommandStore commandStore) { return new DefaultLocalListeners(remoteListeners, notifySink); } @@ -520,6 +520,14 @@ public class DefaultLocalListeners implements LocalListeners public void clear() { txnListeners = BTree.empty(); - complexListeners.clear(); + complexListeners.forEach((key, value) -> { + // the listener registration needs to be invalidated so that a caller does not try to cancel it + RegisteredComplexListeners listeners = complexListeners.remove(key); + for (int i = 0 ; i < listeners.length ; i++) + { + if (listeners.listeners[i] != null) + listeners.listeners[i].index = -1; + } + }); } } diff --git a/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java b/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java index 9e74eb82..1eff3f7d 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java +++ b/accord-core/src/main/java/accord/impl/progresslog/CoordinatePhase.java @@ -32,6 +32,11 @@ public enum CoordinatePhase */ Undecided, + /** + * durably decided but not ready to execute locally; no progress expected + */ + Decided, + /** * durably decided, but replicas may not be ready to execute; should wait until we can expect to successfully * execute the transaction before attempting recovery diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index be43381c..450ff5fc 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -36,6 +36,7 @@ import accord.local.Node; import accord.local.PreLoadContext; import accord.local.SafeCommand; import accord.local.SafeCommandStore; +import accord.primitives.Ballot; import accord.primitives.SaveStatus; import accord.local.StoreParticipants; import accord.primitives.Participants; @@ -55,7 +56,7 @@ import org.agrona.collections.ObjectHashSet; import static accord.api.ProgressLog.BlockedUntil.CanApply; import static accord.api.ProgressLog.BlockedUntil.NotBlocked; -import static accord.impl.progresslog.CoordinatePhase.AwaitReadyToExecute; +import static accord.impl.progresslog.CoordinatePhase.Decided; import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute; import static accord.impl.progresslog.CoordinatePhase.Undecided; import static accord.impl.progresslog.Progress.Awaiting; @@ -196,7 +197,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor Route<?> beforeRoute = before.route(); Route<?> afterRoute = after.route(); if (force || (afterRoute != null && beforeRoute == null) || (after.durability().isDurableOrInvalidated() && !before.durability().isDurableOrInvalidated())) - state = updateHomeState(safeStore, after, get(txnId)); + state = updateOrInitialiseHomeState(safeStore, after, get(txnId)); SaveStatus beforeSaveStatus = before.saveStatus(); SaveStatus afterSaveStatus = after.saveStatus(); @@ -211,23 +212,40 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor state.waiting().record(this, afterSaveStatus); if (state.isHomeInitialised()) + updateHomeState(safeStore, state, before, after); + } + + @Override + public void decided(SafeCommandStore safeStore, TxnId txnId) + { + TxnState state = get(txnId); + if (state != null && state.isHomeInitialised()) + state.home().atLeast(safeStore, this, Decided, NoneExpected); + } + + private void updateHomeState(SafeCommandStore safeStore, TxnState state, Command before, Command after) + { + switch (after.saveStatus()) { - switch (afterSaveStatus) - { - case Stable: - state.home().atLeast(safeStore, this, Undecided, NoneExpected); - break; - case ReadyToExecute: - state.home().atLeast(safeStore, this, AwaitReadyToExecute, Queued); - break; - case PreApplied: - state.home().atLeast(safeStore, this, ReadyToExecute, Queued); - break; - } + case Stable: + if (!after.acceptedOrCommitted().equals(Ballot.ZERO) || (before != null && before.saveStatus() == SaveStatus.Committed)) + state.home().atLeast(safeStore, this, Decided, NoneExpected); + default: + // fall-through to default handler, which simply postpones any scheduled coordination attempt if we witness another coordination attempt in the meantime + if (state.homeProgress() == Queued && (before == null ? after.promised().compareTo(Ballot.ZERO) > 0 : (after.promised().compareTo(before.promised()) > 0) || after.acceptedOrCommitted().compareTo(before.acceptedOrCommitted()) > 0)) + { + clearPending(Home, state.txnId); + state.home().set(safeStore, this, state.phase(), Queued); + } + break; + case ReadyToExecute: + case PreApplied: + state.home().atLeast(safeStore, this, ReadyToExecute, Queued); + break; } } - private TxnState updateHomeState(SafeCommandStore safeStore, Command after, @Nullable TxnState state) + private TxnState updateOrInitialiseHomeState(SafeCommandStore safeStore, Command after, @Nullable TxnState state) { Route<?> route = after.route(); if (after.durability().isDurableOrInvalidated()) @@ -465,7 +483,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor state.waiting().setBlockedUntil(safeStore, this, blockedUntil); // in case progress log hasn't been updated (e.g. bug on replay), force an update to the command's state since we're about to wait on it if (!state.isHomeInitialised() && command.route() != null) - updateHomeState(safeStore, command, state); + updateOrInitialiseHomeState(safeStore, command, state); } @Override @@ -676,14 +694,14 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor @Override public void accept(SafeCommandStore safeStore) { - if (!complete(safeStore, runKind, id, this)) - return; // we've been cancelled - // we have to read safeCommand first as it may become truncated on load, which may clear the progress log and invalidate us SafeCommand safeCommand = safeStore.ifInitialised(run.txnId); if (safeCommand == null) return; + if (!complete(safeStore, runKind, id, this)) + return; // we've been cancelled + // check this after fetching SafeCommand, as doing so can erase the command (and invalidate our state) if (run.isDone(runKind)) return; diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 28190c25..381207b2 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -155,7 +155,7 @@ abstract class HomeState extends WaitingState tracing.trace(safeStore.commandStore(), "Invoking MaybeRecover with progress token %s", maxProgressToken); instance.start(invoker, MaybeRecover.maybeRecover(instance.node(), txnId, invalidIf(), command.route(), maxProgressToken, reportTo, invoker)); - set(safeStore, instance, ReadyToExecute, Querying); + set(safeStore, instance, phase(), Querying); } static void recoverCallback(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog instance, TxnId txnId, @Nullable ProgressToken prevProgressToken, Outcome success, Throwable fail) diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index abfd175f..6d1c3be4 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -40,6 +40,7 @@ import static accord.local.RedundantStatus.Property.LOCALLY_DEFUNCT; import static accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE; import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT; import static accord.local.RedundantStatus.Property.NOT_OWNED; +import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP_OR_STALE; import static accord.local.RedundantStatus.Property.SHARD_APPLIED; import static accord.local.RedundantStatus.Property.TRUNCATE_BEFORE; import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown; @@ -198,7 +199,7 @@ public enum Cleanup Invariants.paranoid(redundant.all(SHARD_APPLIED)); if (!redundant.all(LOCALLY_DURABLE_TO_DATA_STORE)) - return truncateWithOutcome(txnId, participants, min); + return truncateWithOutcome(txnId, redundant, participants, min); if (saveStatus.compareTo(Vestigial) >= 0) { @@ -220,12 +221,9 @@ public enum Cleanup case ShardUniversal: // TODO (required): consider how we guarantee not to break recovery of other shards if a majority on this shard are PRE_BOOTSTRAP // (if the condition is false and we fall through to removing Outcome) - if (input != FULL) - return truncateWithOutcome(txnId, participants, min); - case MajorityOrInvalidated: case Majority: - return truncateWithOutcome(txnId, participants, min); + return truncateWithOutcome(txnId, redundant, participants, min); case UniversalOrInvalidated: case Universal: @@ -309,9 +307,9 @@ public enum Cleanup return INVALIDATE; } - private static Cleanup truncateWithOutcome(TxnId txnId, StoreParticipants participants, Cleanup atLeast) + private static Cleanup truncateWithOutcome(TxnId txnId, RedundantStatus status, StoreParticipants participants, Cleanup atLeast) { - return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : participants.executes() == null || !participants.stillExecutes().isEmpty() + return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : (participants.executes() == null || !participants.stillExecutes().isEmpty()) && !status.all(PRE_BOOTSTRAP_OR_STALE) ? TRUNCATE_WITH_OUTCOME : TRUNCATE; } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 2f843fd7..8b621aa9 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -885,7 +885,7 @@ public class Commands Update waitingOn = new Update(command); if (updateWaitingOn(safeStore, command, command.executeAt(), waitingOn, predecessor)) { - safeCommand.updateWaitingOn(waitingOn); + safeCommand.updateWaitingOn(safeStore, waitingOn); // don't bother invoking maybeExecute if we weren't already blocked on the updated command if (waitingOn.hasUpdatedDirectDependency(command.waitingOn())) maybeExecute(safeStore, safeCommand, false, notifyWaitingOn); @@ -926,7 +926,7 @@ public class Commands waitingOn.removeWaitingOnKey(keyIndex); if (uniqueHlc > 0) waitingOn.updateUniqueHlc(committed.executeAt(), uniqueHlc); - safeCommand.updateWaitingOn(waitingOn); + safeCommand.updateWaitingOn(safeStore, waitingOn); if (!waitingOn.isWaiting()) maybeExecute(safeStore, safeCommand, false, true); } @@ -1355,7 +1355,7 @@ public class Commands // if we are a range transaction, being redundant for this transaction does not imply we are redundant for all transactions if (redundant != null) update.removeWaitingOn(redundant); - return safeCommand.updateWaitingOn(update); + return safeCommand.updateWaitingOn(safeStore, update); } static Command removeNoLongerOwnedDependency(SafeCommandStore safeStore, SafeCommand safeCommand, @Nonnull TxnId wasOwned) @@ -1366,7 +1366,7 @@ public class Commands Update update = new Update(current.waitingOn); update.removeWaitingOn(wasOwned); - return safeCommand.updateWaitingOn(update); + return safeCommand.updateWaitingOn(safeStore, update); } public static Command supplementParticipants(Command command, StoreParticipants participants) diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index a5bd0b57..07653fd7 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -108,28 +108,43 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> public final long startEpoch, endEpoch; public final TxnId bootstrappedAt; public final TxnId gcBefore; + public final TxnId locallyAppliedBefore; - public QuickBounds(long startEpoch, long endEpoch, TxnId bootstrappedAt, TxnId gcBefore) + public QuickBounds(long startEpoch, long endEpoch, TxnId bootstrappedAt, TxnId gcBefore, TxnId locallyAppliedBefore) { this.startEpoch = startEpoch; this.endEpoch = endEpoch; this.bootstrappedAt = bootstrappedAt; this.gcBefore = gcBefore; + this.locallyAppliedBefore = locallyAppliedBefore; } public QuickBounds withEpochs(long startEpoch, long endEpoch) { - return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, gcBefore); + if (startEpoch == this.startEpoch && endEpoch == this.endEpoch) + return this; + return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, gcBefore, locallyAppliedBefore); } public QuickBounds withGcBeforeBeforeAtLeast(TxnId newGcBefore) { - return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, newGcBefore); + if (newGcBefore.equals(this.gcBefore)) + return this; + return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, newGcBefore, locallyAppliedBefore); } public QuickBounds withBootstrappedAtLeast(TxnId newBootstrappedAt) { - return new QuickBounds(startEpoch, endEpoch, newBootstrappedAt, gcBefore); + if (newBootstrappedAt.equals(bootstrappedAt)) + return this; + return new QuickBounds(startEpoch, endEpoch, newBootstrappedAt, gcBefore, locallyAppliedBefore); + } + + public QuickBounds withLocallyAppliedAtLeast(TxnId newLocallyAppliedBefore) + { + if (newLocallyAppliedBefore.equals(locallyAppliedBefore)) + return this; + return new QuickBounds(startEpoch, endEpoch, bootstrappedAt, gcBefore, newLocallyAppliedBefore); } } @@ -162,7 +177,8 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Bounds> public Bounds(Range range, long startEpoch, long endEpoch, TxnId[] bounds, short[] statuses, @Nullable Timestamp staleUntilAtLeast) { - super(startEpoch, endEpoch, maxBound(bounds, statuses, PRE_BOOTSTRAP), maxBound(bounds, statuses, GC_BEFORE)); + super(startEpoch, endEpoch, maxBound(bounds, statuses, PRE_BOOTSTRAP), maxBound(bounds, statuses, GC_BEFORE), + maxBound(bounds, statuses, LOCALLY_APPLIED)); this.range = range; this.bounds = bounds; this.statuses = statuses; diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java b/accord-core/src/main/java/accord/local/SafeCommand.java index d0cb5dae..aea21803 100644 --- a/accord-core/src/main/java/accord/local/SafeCommand.java +++ b/accord-core/src/main/java/accord/local/SafeCommand.java @@ -86,9 +86,9 @@ public abstract class SafeCommand return update; } - public Command.Committed updateWaitingOn(Command.WaitingOn.Update waitingOn) + public Command.Committed updateWaitingOn(SafeCommandStore safeStore, Command.WaitingOn.Update waitingOn) { - return incidentalUpdate(Command.updateWaitingOn(current().asCommitted(), waitingOn)); + return update(safeStore, Command.updateWaitingOn(current().asCommitted(), waitingOn)); } public Command updateParticipants(SafeCommandStore safeStore, StoreParticipants participants) diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index da892a7a..626cb918 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -364,7 +364,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund return; TxnId txnId = next.txnId(); - if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, prev, next); + if (CommandsForKey.manages(txnId)) updateManagedCommandsForKey(this, prev, next, force); if (!CommandsForKey.managesExecution(txnId) && next.hasBeen(Status.Stable) && !next.hasBeen(Status.Truncated) && (force || !prev.hasBeen(Status.Stable))) updateUnmanagedCommandsForKey(this, next, REGISTER); // TODO (expected): register deps during Accept phase to more quickly sync epochs @@ -374,7 +374,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund abstract protected void persistFieldUpdates(); - private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Command prev, Command next) + private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Command prev, Command next, boolean forceNotify) { StoreParticipants participants = next.participants().supplement(prev.participants()); Participants<?> update = next.hasBeen(Status.Committed) ? participants.hasTouched() : participants.stillTouches(); @@ -386,7 +386,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund PreLoadContext execute = safeStore.canExecute(context); if (execute != null) { - updateManagedCommandsForKey(safeStore, execute.keys(), next.txnId()); + updateManagedCommandsForKey(safeStore, execute.keys(), next.txnId(), forceNotify); } if (execute != context) { @@ -399,12 +399,12 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund PreLoadContext ctx = safeStore0.context(); TxnId txnId = ctx.primaryTxnId(); Unseekables<?> keys = ctx.keys(); - updateManagedCommandsForKey(safeStore0, keys, txnId); + updateManagedCommandsForKey(safeStore0, keys, txnId, forceNotify); }, safeStore.commandStore().agent); } } - private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Unseekables<?> update, TxnId txnId) + private static void updateManagedCommandsForKey(SafeCommandStore safeStore, Unseekables<?> update, TxnId txnId, boolean forceNotify) { // TODO (expected): avoid reentrancy / recursion SafeCommand safeCommand = safeStore.get(txnId); @@ -412,7 +412,7 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund { // we use callback and re-fetch current to guard against reentrancy causing // us to interact with "future" or stale information (respectively) - safeStore.get(key).callback(safeStore, safeCommand.current()); + safeStore.get(key).callback(safeStore, safeCommand.current(), forceNotify); } } diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java index 869c51f6..552a22aa 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java @@ -235,7 +235,7 @@ public class CommandsForKey extends CommandsForKeyUpdate private static boolean reportLinearizabilityViolations = true; - public static final QuickBounds NO_BOUNDS_INFO = new QuickBounds(0, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE); + public static final QuickBounds NO_BOUNDS_INFO = new QuickBounds(0, Long.MAX_VALUE, TxnId.NONE, TxnId.NONE, TxnId.NONE); public static final TxnInfo NO_INFO = TxnInfo.create(TxnId.NONE, TRANSITIVE, false, TxnId.NONE, Ballot.ZERO); public static final TxnInfo[] NO_INFOS = new TxnInfo[0]; static final TxnId[] NOT_LOADING_PRUNED = new TxnId[0]; @@ -1202,6 +1202,17 @@ public class CommandsForKey extends CommandsForKeyUpdate return bounds.gcBefore; } + public TxnId appliedBefore() + { + return appliedBefore(bounds); + } + + static TxnId appliedBefore(QuickBounds bounds) + { + // TODO (expected): this can be weakened to shardAppliedOrInvalidatedBefore + return bounds.locallyAppliedBefore; + } + public boolean isPostBootstrapAndOwned(TxnId txnId) { return isPostBootstrapAndOwned(txnId, bounds); @@ -1833,19 +1844,19 @@ public class CommandsForKey extends CommandsForKeyUpdate return new CommandsForKey(key, bounds, byId, minUndecidedById, maxAppliedPreBootstrapWriteById, committedByExecuteAt, maxAppliedWriteByExecuteAt, maxUniqueHlc, newLoadingPruned, prunedBeforeById, unmanageds); } - CommandsForKeyUpdate registerUnmanaged(SafeCommand safeCommand, UpdateUnmanagedMode mode) + CommandsForKeyUpdate registerUnmanaged(SafeCommandStore safeStore, SafeCommand safeCommand, UpdateUnmanagedMode mode) { Invariants.require(mode != UPDATE); - return Updating.updateUnmanaged(this, safeCommand, mode, null); + return Updating.updateUnmanaged(this, safeStore, safeCommand, mode, null); } - void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, @Nullable Command updated, NotifySink notifySink) + void postProcess(SafeCommandStore safeStore, CommandsForKey prevCfk, @Nullable Command updated, NotifySink notifySink, boolean forceNotify) { TxnInfo minUndecided = minUndecided(); - if (minUndecided != null && !minUndecided.equals(prevCfk.minUndecided())) + if (minUndecided != null && (forceNotify || !minUndecided.equals(prevCfk.minUndecided()))) notifySink.waitingOn(safeStore, minUndecided, key, SaveStatus.Stable, HasStableDeps, true); - if (updated == null) + if (updated == null || forceNotify) { notifyManaged(safeStore, AnyGloballyVisible, 0, committedByExecuteAt.length, -1, notifySink); return; @@ -2031,10 +2042,11 @@ public class CommandsForKey extends CommandsForKeyUpdate // we can't let HLC epoch go backwards as this breaks assumptions around maxUniqueHlc tracking if (newBounds.gcBefore.hlc() < bounds.gcBefore.hlc()) { - if (newBounds.endEpoch != bounds.endEpoch || !newBounds.bootstrappedAt.equals(bounds.bootstrappedAt)) + if (newBounds.endEpoch != bounds.endEpoch || !newBounds.bootstrappedAt.equals(bounds.bootstrappedAt) || !newBounds.locallyAppliedBefore.equals(bounds.locallyAppliedBefore)) { newBounds = bounds.withEpochs(bounds.startEpoch, newBounds.endEpoch) - .withBootstrappedAtLeast(newBounds.bootstrappedAt); + .withBootstrappedAtLeast(newBounds.bootstrappedAt) + .withLocallyAppliedAtLeast(bounds.locallyAppliedBefore); } else { @@ -2043,6 +2055,7 @@ public class CommandsForKey extends CommandsForKeyUpdate } else if (newBounds.gcBefore.equals(bounds.gcBefore) && newBounds.bootstrappedAt.equals(bounds.bootstrappedAt) + && newBounds.locallyAppliedBefore.equals(bounds.locallyAppliedBefore) && newBounds.endEpoch == bounds.endEpoch) { return this; diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java b/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java index 9f9fa9b8..eba5d1e8 100644 --- a/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java +++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKeyUpdate.java @@ -36,7 +36,7 @@ public abstract class CommandsForKeyUpdate @VisibleForTesting public abstract CommandsForKey cfk(); abstract PostProcess postProcess(); - abstract void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink); + abstract void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink, boolean forceNotify); static class CommandsForKeyUpdateWithPostProcess extends CommandsForKeyUpdate { @@ -62,9 +62,9 @@ public abstract class CommandsForKeyUpdate } @Override - void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink) + void postProcess(SafeCommandStore safeStore, @Nullable CommandsForKey prevCfk, @Nullable Command command, NotifySink notifySink, boolean forceNotify) { - cfk.postProcess(safeStore, prevCfk, command, notifySink); + cfk.postProcess(safeStore, prevCfk, command, notifySink, forceNotify); postProcess.postProcess(safeStore, cfk.key(), notifySink); } } diff --git a/accord-core/src/main/java/accord/local/cfk/NotifySink.java b/accord-core/src/main/java/accord/local/cfk/NotifySink.java index cfd0f9e0..38b417f6 100644 --- a/accord-core/src/main/java/accord/local/cfk/NotifySink.java +++ b/accord-core/src/main/java/accord/local/cfk/NotifySink.java @@ -125,7 +125,7 @@ interface NotifySink SafeCommandsForKey update = safeStore.ifLoadedAndInitialised(key); if (update != null && safeStore.tryRecurse()) { - try { update.callback(safeStore, safeStore.unsafeGet(txnId).current()); } + try { update.callback(safeStore, safeStore.unsafeGet(txnId).current(), false); } finally { safeStore.unrecurse(); } } else diff --git a/accord-core/src/main/java/accord/local/cfk/PostProcess.java b/accord-core/src/main/java/accord/local/cfk/PostProcess.java index 5c08f8b4..b37ffb56 100644 --- a/accord-core/src/main/java/accord/local/cfk/PostProcess.java +++ b/accord-core/src/main/java/accord/local/cfk/PostProcess.java @@ -115,7 +115,7 @@ abstract class PostProcess StoreParticipants participants = command.participants(); if (participants.owns().domain() == Routable.Domain.Key && !participants.hasTouched(safeCfk.key())) command = safeCommand.updateParticipants(safeStore, participants.supplementHasTouched(RoutingKeys.of(safeCfk.key()))); - safeCfk.callback(safeStore, command, notifySink); + safeCfk.callback(safeStore, command, notifySink, false); } static CommandsForKeyUpdate load(TxnId[] txnIds, CommandsForKeyUpdate result) @@ -214,7 +214,7 @@ abstract class PostProcess { try { - CommandsForKeyUpdate update = updateUnmanaged(cfk, safeCommand, UPDATE, addUnmanageds); + CommandsForKeyUpdate update = updateUnmanaged(cfk, safeStore, safeCommand, UPDATE, addUnmanageds); if (update != cfk) { Invariants.require(update.cfk() == cfk); diff --git a/accord-core/src/main/java/accord/local/cfk/Pruning.java b/accord-core/src/main/java/accord/local/cfk/Pruning.java index 5d6ccf8e..68b4ad1d 100644 --- a/accord-core/src/main/java/accord/local/cfk/Pruning.java +++ b/accord-core/src/main/java/accord/local/cfk/Pruning.java @@ -39,6 +39,8 @@ import static accord.api.ProtocolModifiers.Toggles.isTransitiveDependencyVisible import static accord.local.CommandSummaries.SummaryStatus.APPLIED; import static accord.local.cfk.CommandsForKey.InternalStatus.COMMITTED; import static accord.local.cfk.CommandsForKey.InternalStatus.PRUNED; +import static accord.local.cfk.CommandsForKey.InternalStatus.STABLE; +import static accord.local.cfk.CommandsForKey.appliedBefore; import static accord.local.cfk.CommandsForKey.bootstrappedAt; import static accord.local.cfk.CommandsForKey.insertPos; import static accord.local.cfk.CommandsForKey.managesExecution; @@ -530,6 +532,7 @@ public class Pruning static TxnInfo[] removeRedundantById(TxnInfo[] byId, boolean hasRedundantLoadingPruned, QuickBounds prevBounds, QuickBounds newBounds) { + TxnId newAppliedBefore = appliedBefore(newBounds); TxnId newRedundantBefore = redundantBefore(newBounds); TxnId newBootstrappedAt = bootstrappedAt(newBounds); TxnId prevRedundantBefore = redundantBefore(prevBounds); @@ -539,22 +542,70 @@ public class Pruning TxnInfo[] newById = byId; int pos = insertPos(byId, newRedundantBefore); - if (pos != 0 || hasRedundantLoadingPruned) + int appliedPos = Arrays.binarySearch(byId, pos, byId.length, newAppliedBefore); + if (appliedPos < 0) appliedPos = -1 - appliedPos; + if (pos != 0 || appliedPos != 0 || hasRedundantLoadingPruned) { if (Invariants.isParanoid() && testParanoia(LINEAR, NONE, LOW)) { int startPos = prevBootstrappedAt == null ? 0 : insertPos(byId, prevBootstrappedAt); for (int i = startPos ; i < pos ; ++i) - Invariants.require(byId[i].isNot(COMMITTED) || !byId[i].mayExecute() || !reportLinearizabilityViolations(), "%s redundant; expected to be applied, undecided or to execute in a future epoch", byId[i]); + Invariants.require((byId[i].isNot(COMMITTED) && byId[i].isNot(STABLE)) || !byId[i].mayExecute() || !reportLinearizabilityViolations(), "%s redundant; expected to be applied, undecided or to execute in a future epoch", byId[i]); + } + + int removeUnappliedCount = 0; + if (appliedPos > pos) + { + // we apply additional filtering to remove any transactions we know would apply locally, but haven't executed + // so we know they will not execute. note: we cannot do this safely for any transactions we don't execute locally! + // this is used to handle consistent restore on replay, where we may have some transaction that is logically + // invalidated, but we may only record the invalidation after the snapshot is created because the RX doesn't + // record it as a dependency (and so it doesn't have to be decided for the RX to execute). + // We may then later invalidate and update the CFK, but since this is not reflected in the snapshot, + // after restoring from snapshot we may have an inconsistency between the command state and the CFK state, + // and won't know that we should update the CFK because the command is already invalidated. + // So, to avoid having to read all CFK on startup and process any potentially invalidated transactions, + // we instead apply this filtering aggressively whenever we know the transaction cannot apply, + // and rely on the applied RX to correctly reject recovery of the transaction. + for (int i = pos ; i < appliedPos ; ++i) + { + if (byId[i].compareTo(APPLIED) < 0) + { + if (byId[i].mayExecute()) + { + Invariants.require((byId[i].isNot(COMMITTED) && byId[i].isNot(STABLE)) || !reportLinearizabilityViolations(), "%s redundant; expected to be applied, undecided or to execute in a future epoch", byId[i]); + // we only filter those that would apply locally + removeUnappliedCount++; + } + } + } + } + + int newAppliedBeforeIndex; + if (removeUnappliedCount > 0) + { + newAppliedBeforeIndex = (appliedPos - pos) - removeUnappliedCount; + newById = new TxnInfo[(byId.length - appliedPos) + newAppliedBeforeIndex]; + removeUnappliedCount = 0; + for (int i = pos ; i < appliedPos ; ++i) + { + if (byId[i].compareTo(APPLIED) < 0 && byId[i].mayExecute()) ++removeUnappliedCount; + else newById[i - (pos + removeUnappliedCount)] = byId[i]; + } + System.arraycopy(byId, appliedPos, newById, newAppliedBeforeIndex, newById.length - newAppliedBeforeIndex); + } + else + { + newAppliedBeforeIndex = -1; + newById = Arrays.copyOfRange(byId, pos, byId.length); } - newById = Arrays.copyOfRange(byId, pos, byId.length); for (int i = 0 ; i < newById.length ; ++i) { TxnInfo txn = newById[i]; TxnId[] missing = txn.missing(); if (missing == NO_TXNIDS) continue; - missing = removeRedundantMissing(missing, newRedundantBefore); + missing = removeRedundantMissing(missing, newRedundantBefore, newById, newAppliedBeforeIndex); newById[i] = txn.withMissing(missing); } } diff --git a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java index 90367c2d..9c7158a5 100644 --- a/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java +++ b/accord-core/src/main/java/accord/local/cfk/SafeCommandsForKey.java @@ -47,38 +47,32 @@ public abstract class SafeCommandsForKey implements SafeState<CommandsForKey> return key; } - public void update(SafeCommandStore safeStore, Command nextCommand) - { - CommandsForKey prevCfk = current(); - update(safeStore, nextCommand, prevCfk, prevCfk.update(safeStore, nextCommand)); - } - public void updateUniqueHlc(SafeCommandStore safeStore, long uniqueHlc) { CommandsForKey prevCfk = current(); - update(safeStore, null, prevCfk, prevCfk.updateUniqueHlc(uniqueHlc)); + update(safeStore, null, prevCfk, prevCfk.updateUniqueHlc(uniqueHlc), false); } // equivalent to update, but for async callbacks with additional validation around pruning - public void callback(SafeCommandStore safeStore, Command nextCommand) + public void callback(SafeCommandStore safeStore, Command nextCommand, boolean forceNotify) { - callback(safeStore, nextCommand, DefaultNotifySink.INSTANCE); + callback(safeStore, nextCommand, DefaultNotifySink.INSTANCE, forceNotify); } - public void callback(SafeCommandStore safeStore, Command nextCommand, NotifySink notifySink) + public void callback(SafeCommandStore safeStore, Command nextCommand, NotifySink notifySink, boolean forceNotify) { CommandsForKey prevCfk = current(); - update(safeStore, nextCommand, prevCfk, prevCfk.callback(safeStore, nextCommand), notifySink); + update(safeStore, nextCommand, prevCfk, prevCfk.callback(safeStore, nextCommand), notifySink, forceNotify); } - private void update(SafeCommandStore safeStore, @Nullable Command command, CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk) + private void update(SafeCommandStore safeStore, @Nullable Command command, CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, boolean forceNotify) { - update(safeStore, command, prevCfk, updateCfk, DefaultNotifySink.INSTANCE); + update(safeStore, command, prevCfk, updateCfk, DefaultNotifySink.INSTANCE, forceNotify); } - private void update(SafeCommandStore safeStore, @Nullable Command command, CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, NotifySink notifySink) + private void update(SafeCommandStore safeStore, @Nullable Command command, CommandsForKey prevCfk, CommandsForKeyUpdate updateCfk, NotifySink notifySink, boolean forceNotify) { - if (updateCfk == prevCfk) + if (updateCfk == prevCfk && !forceNotify) return; CommandsForKey nextCfk = updateCfk.cfk(); @@ -92,19 +86,19 @@ public abstract class SafeCommandsForKey implements SafeState<CommandsForKey> set(nextCfk); } - updateCfk.postProcess(safeStore, prevCfk, command, notifySink); + updateCfk.postProcess(safeStore, prevCfk, command, notifySink, forceNotify); } public void registerUnmanaged(SafeCommandStore safeStore, SafeCommand unmanaged, UpdateUnmanagedMode mode) { CommandsForKey prevCfk = current(); - update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(unmanaged, mode)); + update(safeStore, null, prevCfk, prevCfk.registerUnmanaged(safeStore, unmanaged, mode), false); } public void updateRedundantBefore(SafeCommandStore safeStore, RedundantBefore.Bounds redundantBefore) { CommandsForKey prevCfk = current(); - update(safeStore, null, prevCfk, prevCfk.withRedundantBeforeAtLeast(redundantBefore)); + update(safeStore, null, prevCfk, prevCfk.withRedundantBeforeAtLeast(redundantBefore), false); } public void initialize() diff --git a/accord-core/src/main/java/accord/local/cfk/Updating.java b/accord-core/src/main/java/accord/local/cfk/Updating.java index 4bddc48e..c38c86a2 100644 --- a/accord-core/src/main/java/accord/local/cfk/Updating.java +++ b/accord-core/src/main/java/accord/local/cfk/Updating.java @@ -35,6 +35,7 @@ import accord.local.CommandStore; import accord.local.PreLoadContext; import accord.local.RedundantBefore.QuickBounds; import accord.local.SafeCommand; +import accord.local.SafeCommandStore; import accord.local.cfk.CommandsForKey.InternalStatus; import accord.local.cfk.CommandsForKeyUpdate.CommandsForKeyUpdateWithPostProcess; import accord.local.cfk.PostProcess.LoadPruned; @@ -850,7 +851,7 @@ class Updating commandStore.execute(context, safeStore -> { SafeCommandsForKey safeCommandsForKey = safeStore.get(key); CommandsForKey cur = safeCommandsForKey.current(); - CommandsForKeyUpdate next = Updating.updateUnmanaged(cur, safeStore.unsafeGet(txnId)); + CommandsForKeyUpdate next = Updating.updateUnmanaged(cur, safeStore, safeStore.unsafeGet(txnId)); if (cur != next) { if (cur != next.cfk()) @@ -863,14 +864,14 @@ class Updating }, commandStore.agent()); } - static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand) + static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommandStore safeStore, SafeCommand safeCommand) { - return Updating.updateUnmanaged(cfk, safeCommand, UPDATE, null); + return Updating.updateUnmanaged(cfk, safeStore, safeCommand, UPDATE, null); } - static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, SafeCommand safeCommand) + static CommandsForKeyUpdate registerDependencies(CommandsForKey cfk, SafeCommandStore safeStore, SafeCommand safeCommand) { - return Updating.updateUnmanaged(cfk, safeCommand, REGISTER_DEPS_ONLY, null); + return Updating.updateUnmanaged(cfk, safeStore, safeCommand, REGISTER_DEPS_ONLY, null); } /** @@ -880,7 +881,7 @@ class Updating * - {@code UPDATE, update == null}: fails if any dependencies are missing; always returns a CommandsForKey * - {@code UPDATE && update != null}: fails if any dependencies are missing; always returns the original CommandsForKey, and maybe adds a new Unmanaged to {@code update} */ - static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable List<CommandsForKey.Unmanaged> update) + static CommandsForKeyUpdate updateUnmanaged(CommandsForKey cfk, SafeCommandStore safeStore, SafeCommand safeCommand, UpdateUnmanagedMode mode, @Nullable List<CommandsForKey.Unmanaged> update) { boolean register = mode != UPDATE; Invariants.requireArgument(mode == UPDATE || update == null); @@ -1017,7 +1018,8 @@ class Updating } else { - Invariants.require(txnIds.get(i++).compareTo(cfk.prunedBefore()) < 0); + Invariants.require(txnIds.get(i).compareTo(TxnId.max(cfk.bounds.locallyAppliedBefore, cfk.prunedBefore())) < 0); + ++i; } } @@ -1052,7 +1054,7 @@ class Updating } } - waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, effectiveExecutesAt, safeCommand); + waitingToExecuteAt = updateExecuteAtLeast(waitingToExecuteAt, effectiveExecutesAt, safeStore, safeCommand); if (!readyToApply || missingCount > 0 || newById != null) { if (newById == null) newById = byId; @@ -1137,7 +1139,7 @@ class Updating return new CommandsForKeyUpdateWithPostProcess(cfk, new NotifyNotWaiting(null, new TxnId[] { safeCommand.txnId() })); } - private static Timestamp updateExecuteAtLeast(Timestamp waitingToExecuteAt, Timestamp effectiveExecutesAt, SafeCommand safeCommand) + private static Timestamp updateExecuteAtLeast(Timestamp waitingToExecuteAt, Timestamp effectiveExecutesAt, SafeCommandStore safeStore, SafeCommand safeCommand) { if (waitingToExecuteAt instanceof TxnInfo) waitingToExecuteAt = ((TxnInfo) waitingToExecuteAt).plainExecuteAt(); @@ -1152,7 +1154,7 @@ class Updating if (effectiveExecutesAt instanceof TxnInfo) effectiveExecutesAt = ((TxnInfo) effectiveExecutesAt).plainExecuteAt(); waitingOn.updateExecuteAtLeast(txnId, effectiveExecutesAt); - safeCommand.updateWaitingOn(waitingOn); + safeCommand.updateWaitingOn(safeStore, waitingOn); } } diff --git a/accord-core/src/main/java/accord/local/cfk/Utils.java b/accord-core/src/main/java/accord/local/cfk/Utils.java index 695e8f88..c55ae13c 100644 --- a/accord-core/src/main/java/accord/local/cfk/Utils.java +++ b/accord-core/src/main/java/accord/local/cfk/Utils.java @@ -350,16 +350,45 @@ class Utils return newMissing; } - static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore) + static TxnId[] removeRedundantMissing(TxnId[] missing, TxnId removeBefore, TxnInfo[] newById, int appliedBeforeIndex) { if (missing == NO_TXNIDS) return NO_TXNIDS; int j = Arrays.binarySearch(missing, removeBefore); if (j < 0) j = -1 - j; - if (j <= 0) return missing; - if (j == missing.length) return NO_TXNIDS; - return Arrays.copyOfRange(missing, j, missing.length); + if (j > 0) + { + if (j == missing.length) return NO_TXNIDS; + missing = Arrays.copyOfRange(missing, j, missing.length); + } + if (appliedBeforeIndex < 0) + return missing; + + int removed = 0; + j = SortedArrays.binarySearch(newById, 0, appliedBeforeIndex, missing[0], TxnId::compareTo, FAST); + if (j >= 0) ++j; + else + { + ++removed; + j = -1 - j; + } + for (int i = 1 ; i < missing.length ; ++i) + { + j = SortedArrays.exponentialSearch(newById, j, appliedBeforeIndex, missing[i], TxnId::compareTo, FAST); + if (j < 0) + { + ++removed; + j = -1 - j; + } + else if (removed > 0) + { + missing[i - removed] = missing[i]; + } + } + if (removed == 0) return missing; + else if (removed == missing.length) return NO_TXNIDS; + else return Arrays.copyOf(missing, missing.length - removed); } static TxnId[] ensureOneMissing(TxnId txnId, TxnId[] oneMissing) diff --git a/accord-core/src/main/java/accord/local/durability/ShardDurability.java b/accord-core/src/main/java/accord/local/durability/ShardDurability.java index 01f90a53..d49fb129 100644 --- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java +++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java @@ -423,8 +423,8 @@ public class ShardDurability /* * In each cycle, attempt to split the range into this many pieces; if we fail, we increase the number of pieces */ - private int targetShardSplits = 64; - private int maxShardSplits = 1 << 10; + private int targetShardSplits = 8; + private int maxShardSplits = 64; /* * Target for how often the entire ring should be processed in microseconds. Every node will start at an offset in the current round that is based diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java index fa8753aa..c0e8013f 100644 --- a/accord-core/src/main/java/accord/messages/Accept.java +++ b/accord-core/src/main/java/accord/messages/Accept.java @@ -182,7 +182,7 @@ public class Accept extends TxnRequest.WithUnsynced<Accept.AcceptReply> Invariants.require(deps.maxTxnId(txnId).epoch() <= executeAt.epoch()); if (filterDuplicateDependenciesFromAcceptReply()) - deps = deps.without(this.partialDeps); + deps = deps.without(partialDeps); Participants<?> successful = isPartialAccept ? participants.touches() : null; return new AcceptReply(successful, deps, flags); diff --git a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java index 8a582291..64465149 100644 --- a/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java +++ b/accord-core/src/main/java/accord/messages/ApplyThenWaitUntilApplied.java @@ -141,15 +141,6 @@ public class ApplyThenWaitUntilApplied extends WaitUntilApplied public void accept(CommitOrReadNack reply, Throwable failure) { super.accept(reply, failure); - - boolean waiting; - synchronized (this) - { - waiting = waitingOnCount >= 0; - } - if (waiting && reply == null && failure == null) - node.reply(replyTo, replyContext, CommitOrReadNack.Waiting, null); - txn = null; deps = null; writes = null; diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index 15e3d883..233cd542 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -146,9 +146,9 @@ public class BeginRecovery extends TxnRequest.WithUnsynced<BeginRecovery.Recover boolean supersedingRejects; Deps earlierNoWait, earlierWait; Deps laterCoordRejects; - if (command.hasBeen(AcceptedMedium)) + if (command.hasBeen(AcceptedMedium) || txnId.is(ExclusiveSyncPoint)) { - supersedingRejects = false; + supersedingRejects = !command.hasBeen(AcceptedMedium); earlierNoWait = earlierWait = Deps.NONE; laterCoordRejects = Deps.NONE; } @@ -230,6 +230,8 @@ public class BeginRecovery extends TxnRequest.WithUnsynced<BeginRecovery.Recover @Override public LoadKeysFor loadKeysFor() { + if (txnId.is(ExclusiveSyncPoint)) + return LoadKeysFor.READ_WRITE; return LoadKeysFor.RECOVERY; } diff --git a/accord-core/src/main/java/accord/messages/InformDecided.java b/accord-core/src/main/java/accord/messages/InformDecided.java new file mode 100644 index 00000000..6bc6d136 --- /dev/null +++ b/accord-core/src/main/java/accord/messages/InformDecided.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package accord.messages; + +import accord.api.RoutingKey; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.local.SafeCommandStore; +import accord.primitives.Route; +import accord.primitives.TxnId; +import accord.topology.Shard; +import accord.topology.Topologies; +import accord.utils.async.Cancellable; + +import static accord.messages.MessageType.StandardMessage.INFORM_DECIDED_REQ; + +public class InformDecided extends AbstractRequest<Reply> +{ + public static class SerializationSupport + { + public static InformDecided create(TxnId txnId, RoutingKey homeKey) + { + return new InformDecided(txnId, homeKey); + } + } + + public final RoutingKey homeKey; + public InformDecided(TxnId txnId, RoutingKey homeKey) + { + super(txnId); + this.homeKey = homeKey; + } + + public static void informHome(Node node, Topologies any, TxnId txnId, Route<?> route) + { + Shard homeShard = InformDurable.homeShard(node, any, txnId, route.homeKey()); + node.send(homeShard.nodes, to -> new InformDecided(txnId, route.homeKey())); + } + + @Override + public Cancellable submit() + { + // TODO (expected): do not load from disk to perform this update, just write a delta to any journal + return node.mapReduceConsumeLocal(this, homeKey, txnId.epoch(), this); + } + + @Override + public Reply apply(SafeCommandStore safeStore) + { + safeStore.progressLog().decided(safeStore, txnId); + return null; + } + + @Override + public TxnId primaryTxnId() + { + // we don't need the transaction loaded to update the progress log + return null; + } + + @Override + protected void acceptInternal(Reply reply, Throwable failure) + { + } + + @Override + public String toString() + { + return "InformDecided{" + + "txnId:" + txnId + + '}'; + } + + @Override + public MessageType type() + { + return INFORM_DECIDED_REQ; + } +} diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java index 773c2dcc..1a4df25f 100644 --- a/accord-core/src/main/java/accord/messages/InformDurable.java +++ b/accord-core/src/main/java/accord/messages/InformDurable.java @@ -19,6 +19,7 @@ package accord.messages; import javax.annotation.Nullable; +import accord.api.RoutingKey; import accord.local.Commands; import accord.local.LoadKeys; import accord.local.Node; @@ -89,17 +90,8 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext public static void informHome(Node node, Topologies any, TxnId txnId, Route<?> route, Timestamp executeAt, Durability durability) { - long homeEpoch = txnId.epoch(); - Topology homeEpochTopology = any.getEpoch(homeEpoch); - int homeShardIndex = homeEpochTopology.indexForKey(route.homeKey()); - if (homeShardIndex < 0) - { - homeEpochTopology = node.topology().globalForEpoch(homeEpoch); - homeShardIndex = homeEpochTopology.indexForKey(route.homeKey()); - } - - Shard homeShard = homeEpochTopology.get(homeShardIndex); - Topologies homeTopology = new Topologies.Single(any, new Topology(homeEpoch, homeShard)); + Shard homeShard = homeShard(node, any, txnId, route.homeKey()); + Topologies homeTopology = new Topologies.Single(any, new Topology(txnId.epoch(), homeShard)); node.send(homeShard.nodes, to -> new InformDurable(to, homeTopology, route.homeKeyOnlyRoute(), txnId, executeAt, txnId.epoch(), txnId.epoch(), durability)); } @@ -108,6 +100,25 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext node.send(inform.nodes(), to -> new InformDurable(to, inform, route, txnId, executeAt, inform.oldestEpoch(), inform.currentEpoch(), durability)); } + static Shard homeShard(Node node, Topologies any, TxnId txnId, RoutingKey homeKey) + { + long homeEpoch = txnId.epoch(); + int homeShardIndex = -1; + Topology homeEpochTopology = null; + if (any.containsEpoch(homeEpoch)) + { + homeEpochTopology = any.getEpoch(homeEpoch); + homeShardIndex = homeEpochTopology.indexForKey(homeKey); + } + if (homeShardIndex < 0) + { + homeEpochTopology = node.topology().globalForEpoch(homeEpoch); + homeShardIndex = homeEpochTopology.indexForKey(homeKey); + } + + return homeEpochTopology.get(homeShardIndex); + } + @Override public Cancellable submit() { @@ -142,7 +153,7 @@ public class InformDurable extends TxnRequest<Reply> implements PreLoadContext @Override public String toString() { - return "InformOfPersistence{" + + return "InformDurable{" + "txnId:" + txnId + '}'; } diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java index a75cb966..8c1d3566 100644 --- a/accord-core/src/main/java/accord/messages/MessageType.java +++ b/accord-core/src/main/java/accord/messages/MessageType.java @@ -42,6 +42,7 @@ public interface MessageType CHECK_STATUS_REQ, CHECK_STATUS_RSP, FETCH_DATA_REQ, FETCH_DATA_RSP, INFORM_DURABLE_REQ, + INFORM_DECIDED_REQ, GET_LATEST_DEPS_REQ, GET_LATEST_DEPS_RSP, GET_MAX_CONFLICT_REQ, GET_MAX_CONFLICT_RSP, GET_DURABLE_BEFORE_REQ, GET_DURABLE_BEFORE_RSP, diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index e021d527..c89ce71b 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -68,6 +68,7 @@ import static accord.coordinate.ExecuteFlag.READY_TO_EXECUTE; import static accord.messages.MessageType.StandardMessage.READ_RSP; import static accord.messages.ReadData.CommitOrReadNack.Insufficient; import static accord.messages.ReadData.CommitOrReadNack.Redundant; +import static accord.messages.ReadData.CommitOrReadNack.Waiting; import static accord.messages.TxnRequest.latestRelevantEpochIndex; import static accord.primitives.Routables.Slice.Minimal; import static accord.primitives.Txn.Kind.EphemeralRead; @@ -313,7 +314,7 @@ public abstract class ReadData implements PreLoadContext, Request, MapReduceCons if (c < 0) safeStore.progressLog().waiting(HasStableDeps, safeStore, safeCommand, null, null, participants); else if (c > 0 && status.compareTo(executeOn().min) >= 0 && status.compareTo(SaveStatus.PreApplied) < 0) safeStore.progressLog().waiting(CanApply, safeStore, safeCommand, null, scope, null); node.agent().localEvents().onReadWaiting(safeStore, command); - return status.compareTo(SaveStatus.Stable) >= 0 ? null : Insufficient; + return status.compareTo(SaveStatus.Stable) >= 0 ? Waiting : Insufficient; case OBSOLETE: state = State.PENDING_OBSOLETE; diff --git a/accord-core/src/main/java/accord/primitives/RangeDeps.java b/accord-core/src/main/java/accord/primitives/RangeDeps.java index e60ee1ad..34eecda6 100644 --- a/accord-core/src/main/java/accord/primitives/RangeDeps.java +++ b/accord-core/src/main/java/accord/primitives/RangeDeps.java @@ -720,18 +720,22 @@ public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>>, KeyOrRangeD if (!RelationMultiMap.removeWithPartialMatches(txnIds, ranges, txnIdsToRanges, remove.txnIds, remove.ranges, remove.txnIdsToRanges, TxnId::compareTo, Range::compareIntersecting, builder, (b, id, kr, rr) -> { - Range remainder = null; - if (rr != null) + if (rr == null) { - int compareStarts = rr.start().compareTo(kr.start()); - if (rr.end().compareTo(kr.end()) < 0) - remainder = rr.newRange(compareStarts >= 0 ? rr.start() : kr.start(), rr.end()); - if (compareStarts <= 0) - return remainder; - kr = kr.newRange(kr.start(), rr.start()); + b.add(id, kr); + return null; + } + int compareStarts = rr.start().compareTo(kr.start()); + int compareEnds = rr.end().compareTo(kr.end()); + if (compareStarts <= 0 && compareEnds >= 0) return null; + else if (compareStarts <= 0) return rr.newRange(rr.end(), kr.end()); + else + { + b.add(rr.newRange(kr.start(), rr.start()), id); + if (compareEnds >= 0) + return null; + return rr.newRange(rr.end(), kr.end()); } - b.add(id, kr); - return remainder; })) { return this; @@ -1160,10 +1164,25 @@ public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>>, KeyOrRangeD if (range.compareIntersecting(last) == 0) { RoutingKey rstart = range.start(), lstart = last.start(); - Invariants.require(rstart.compareTo(lstart) >= 0); + RoutingKey start = rstart.compareTo(lstart) < 0 ? rstart : lstart; RoutingKey rend = range.end(), lend = last.end(); - if (rend.compareTo(lend) > 0) - updateLast(last.newRange(lstart, rend)); + RoutingKey end = rend.compareTo(lend) > 0 ? rend : lend; + if (start != lstart || end != lend) + { + Range newRange = last.newRange(start, end); + if (start != lstart) + { + Range prev = penultimateKeyValue(); + while (prev != null && prev.compareIntersecting(newRange) == 0) + { + removeLastKeyValue(); + if (prev.start().compareTo(start) < 0) + newRange = newRange.newRange(prev.start(), end); + prev = penultimateKeyValue(); + } + } + updateLast(newRange); + } return; } } diff --git a/accord-core/src/main/java/accord/utils/RelationMultiMap.java b/accord-core/src/main/java/accord/utils/RelationMultiMap.java index 5e09ee1a..1c549908 100644 --- a/accord-core/src/main/java/accord/utils/RelationMultiMap.java +++ b/accord-core/src/main/java/accord/utils/RelationMultiMap.java @@ -212,6 +212,19 @@ public class RelationMultiMap keysToValues[totalCount - 1] = value; } + protected V penultimateKeyValue() + { + if (totalCount - keyOffset < 2) + return null; + return keysToValues[totalCount - 2]; + } + + protected void removeLastKeyValue() + { + Invariants.require(totalCount - keyOffset > 0); + keysToValues[--totalCount] = null; + } + /** * Add this command as a dependency for each intersecting key */ diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index a0acf419..3bc9ae49 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -700,11 +700,7 @@ public class Cluster TestProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, cacheLoading), new CoordinationAdapter.DefaultFactory(), journal.durableBeforePersister(), journal); journal.start(node); - DurabilityService durability = node.durability(); - // TODO (desired): randomise - durability.shards().setShardCycleTime(30, SECONDS); - durability.global().setGlobalCycleTime(180, SECONDS); - durabilityServices.add(durability); + durabilityServices.add(node.durability()); nodeMap.put(id, node); durabilityServices.add(new DurabilityService(node)); } @@ -730,7 +726,7 @@ public class Cluster Runnable updateProgressLogConcurrency; { updateProgressLogConcurrency = () -> { - nodeMap.values().forEach(node -> node.commandStores().forEachCommandStore(cs -> ((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(1, 16)))); + nodeMap.values().forEach(node -> node.commandStores().forEachCommandStore(cs -> ((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(8, 32)))); }; } updateProgressLogConcurrency.run(); diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java b/accord-core/src/test/java/accord/impl/list/ListAgent.java index 4c20684e..515d8104 100644 --- a/accord-core/src/test/java/accord/impl/list/ListAgent.java +++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java @@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongSupplier; @@ -47,6 +48,7 @@ import accord.impl.basic.SimulatedFault; import accord.impl.mock.Network; import accord.local.Command; import accord.local.Node; +import accord.local.PreLoadContext; import accord.local.SafeCommandStore; import accord.local.TimeService; import accord.messages.ReplyContext; @@ -290,7 +292,9 @@ public class ListAgent implements InMemoryAgent, CoordinatorEventListener public AsyncResult<Void> snapshot(InMemoryCommandStore commandStore) { Snapshotter<Snapshot> snapshotter = snapshotters.computeIfAbsent(commandStore.id(), ignore -> new Snapshotter<>(scheduler, rnd)); - return snapshotter.snapshot(false, Snapshot.snapshot(commandStore)); + return commandStore.submit((PreLoadContext.Empty)() -> "Snapshot", safeStore -> snapshotter.snapshot(false, Snapshot.snapshot(commandStore))) + .flatMap(Function.identity()) + .beginAsResult(); } public void restore(InMemoryCommandStore commandStore) diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 6d1a84d3..e90c205f 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -678,15 +678,15 @@ public class CommandsForKeyTest safeCfk.set(result.cfk()); if (rnd.decide(pruneChance)) safeCfk.set(safeCfk.current.maybePrune(pruneInterval, pruneHlcDelta)); - result.postProcess(safeStore, prev, update.next, canon); + result.postProcess(safeStore, prev, update.next, canon, false); } if (!CommandsForKey.managesExecution(update.next.txnId()) && update.next.hasBeen(Status.Stable) && !update.next.hasBeen(Status.Truncated)) { CommandsForKey prev = safeCfk.current(); - result = prev.registerUnmanaged(safeCommand, REGISTER); + result = prev.registerUnmanaged(safeStore, safeCommand, REGISTER); safeCfk.set(result.cfk()); - result.postProcess(safeStore, prev, null, canon); + result.postProcess(safeStore, prev, null, canon, false); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org