This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch startup-sequence-improvements in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit dca1fcf8ae70acc04fbf82dc456caf1b4c0be482 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Wed Jul 30 10:53:22 2025 +0100 Improve ProgressLog: - Schedule a fallback timeout for active requests to ensure progress with lost callbacks - Clear active task for a txnId when new RunInvoker is registered - Consult DurableBefore prior to invoking recovery - Support user invoked reset of a command, clearing any active work and requeueing it - (Testing): Treat progress log for RX as recurring tasks for burn test termination --- .../src/main/java/accord/coordinate/FetchData.java | 25 ++- .../main/java/accord/coordinate/FetchRoute.java | 10 +- .../main/java/accord/coordinate/MaybeRecover.java | 8 +- .../accord/impl/progresslog/CallbackInvoker.java | 49 +++-- .../impl/progresslog/DefaultProgressLog.java | 216 +++++++++++++-------- .../java/accord/impl/progresslog/HomeState.java | 20 +- .../java/accord/impl/progresslog/TxnState.java | 14 +- .../java/accord/impl/progresslog/WaitingState.java | 23 ++- accord-core/src/main/java/accord/local/Node.java | 10 +- .../accord/local/durability/DurabilityQueue.java | 5 +- .../src/test/java/accord/impl/basic/Cluster.java | 4 +- .../src/test/java/accord/impl/basic/Pending.java | 5 + .../java/accord/impl/basic/TestProgressLogs.java | 29 ++- 13 files changed, 260 insertions(+), 158 deletions(-) diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java index dc75c4fe..9b3374a5 100644 --- a/accord-core/src/main/java/accord/coordinate/FetchData.java +++ b/accord-core/src/main/java/accord/coordinate/FetchData.java @@ -73,11 +73,11 @@ public class FetchData extends CheckShards<Route<?>> final long srcEpoch; // known participants, a subset of which we may fetch from final Participants<?> contactable; - final StoreSelector reportTo; + final LatentStoreSelector reportTo; final BiConsumer<? super FetchResult, Throwable> callback; final @Nullable Tracing tracing; - public FetchRequest(SequentialAsyncExecutor executor, Known fetch, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback, @Nullable Tracing tracing) + public FetchRequest(SequentialAsyncExecutor executor, Known fetch, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback, @Nullable Tracing tracing) { this.executor = executor; this.fetch = fetch; @@ -95,29 +95,26 @@ public class FetchData extends CheckShards<Route<?>> /** * Do not make an attempt to discern what keys need to be contacted; fetch from only the specific remote keys that were requested. */ - public static void fetchSpecific(Known fetch, Node node, TxnId txnId, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) + public static Object fetchSpecific(Known fetch, Node node, TxnId txnId, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, query, maxRoute, reportTo, callback); + return fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt, query, maxRoute, reportTo, callback); } /** * Do not make an attempt to discern what keys need to be contacted; fetch from only the specific remote keys that were requested. */ - public static void fetchSpecific(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) + public static Object fetchSpecific(Known fetch, Node node, TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { - fetchSpecific(node, query, maxRoute, new FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH))); + return fetchSpecific(node, query, maxRoute, new FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt, maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH))); } - public static void fetchSpecific(Node node, Route<?> query, Route<?> maxRoute, FetchRequest request) + public static Object fetchSpecific(Node node, Route<?> query, Route<?> maxRoute, FetchRequest request) { long srcEpoch = request.srcEpoch; if (!node.topology().hasAtLeastEpoch(srcEpoch)) - { - node.withEpochAtLeast(srcEpoch, request.executor, request.callback, () -> fetchSpecific(node, query, maxRoute, request)); - return; - } + return node.withEpochAtLeast(srcEpoch, request.executor, request.callback, () -> fetchSpecific(node, query, maxRoute, request)); - fetchData(node, query, maxRoute, request); + return fetchData(node, query, maxRoute, request); } final BiConsumer<? super FetchResult, Throwable> callback; @@ -131,12 +128,12 @@ public class FetchData extends CheckShards<Route<?>> // (i.e. if preaccept/accept contact a later epoch than execution is decided for) final LatentStoreSelector reportTo; - private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) + private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { this(node, target, txnId, invalidIf, route, route.withHomeKey(), maxRoute, sourceEpoch, reportTo, callback); } - private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) + private FetchData(Node node, Known target, TxnId txnId, InvalidIf invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long sourceEpoch, LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable> callback) { // TODO (desired, efficiency): restore behaviour of only collecting info if e.g. Committed or Executed super(node, node.someSequentialExecutor(), txnId, routeWithHomeKey, sourceEpoch, CheckStatus.IncludeInfo.All, null, invalidIf); diff --git a/accord-core/src/main/java/accord/coordinate/FetchRoute.java b/accord-core/src/main/java/accord/coordinate/FetchRoute.java index 7a4f149b..0f03b75e 100644 --- a/accord-core/src/main/java/accord/coordinate/FetchRoute.java +++ b/accord-core/src/main/java/accord/coordinate/FetchRoute.java @@ -59,23 +59,23 @@ public class FetchRoute extends CheckShards<Participants<?>> this.callback = callback; } - public static void fetchRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback, Tracing tracing) + public static Object fetchRoute(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback, Tracing tracing) { if (!node.topology().hasEpoch(txnId.epoch())) { if (tracing != null) tracing.trace(null, "Waiting for epoch %d", txnId.epoch()); - node.withEpochAtLeast(txnId.epoch(), null, callback, () -> fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing)); - return; + return node.withEpochAtLeast(txnId.epoch(), null, callback, () -> fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing)); } FetchRoute fetchRoute = new FetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing); fetchRoute.start(); + return fetchRoute; } - public static void fetchRoute(Node node, TxnId txnId, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) + public static Object fetchRoute(Node node, TxnId txnId, Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable> callback) { - fetchRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo, callback, node.agent().trace(txnId, FETCH)); + return fetchRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo, callback, node.agent().trace(txnId, FETCH)); } @Override diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java index 5c1fc3bf..0a04519a 100644 --- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java +++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java @@ -21,6 +21,8 @@ package accord.coordinate; import java.util.function.BiConsumer; import accord.api.Tracing; +import accord.local.CommandStores; +import accord.local.CommandStores.LatentStoreSelector; import accord.local.CommandStores.StoreSelector; import accord.local.SequentialAsyncExecutor; import accord.messages.InformDurable; @@ -45,9 +47,9 @@ public class MaybeRecover extends CheckShards<Route<?>> { final ProgressToken prevProgress; final BiConsumer<Outcome, Throwable> callback; - final StoreSelector reportTo; + final LatentStoreSelector reportTo; - MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) + MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { // we only want to enquire with the home shard, but we prefer maximal route information for running Invalidation against, if necessary super(node, executor, txnId, someRoute.withHomeKey(), IncludeInfo.Route, null, invalidIf, node.agent().trace(txnId, RECOVER)); @@ -56,7 +58,7 @@ public class MaybeRecover extends CheckShards<Route<?>> this.reportTo = reportTo; } - public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) + public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress, LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback) { MaybeRecover maybeRecover = new MaybeRecover(node, node.someSequentialExecutor(), txnId, invalidIf, someRoute, prevProgress, reportTo, callback); maybeRecover.start(); diff --git a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java index 4667846b..80d50fb9 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java +++ b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java @@ -24,12 +24,13 @@ import javax.annotation.Nullable; import accord.local.PreLoadContext; import accord.local.SafeCommand; +import accord.local.SafeCommandStore; import accord.primitives.TxnId; import static accord.impl.progresslog.TxnStateKind.Home; import static accord.impl.progresslog.TxnStateKind.Waiting; -class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext +final class CallbackInvoker<P, V> extends DefaultProgressLog.PendingTask implements BiConsumer<V, Throwable>, PreLoadContext { static <P, V> CallbackInvoker<P, V> invokeWaitingCallback(DefaultProgressLog instance, TxnId txnId, P param, Callback<P, V> callback) { @@ -43,12 +44,11 @@ class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext static <P, V> CallbackInvoker<P, V> invokeCallback(TxnStateKind kind, DefaultProgressLog owner, TxnId txnId, P param, Callback<P, V> callback) { - CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind, owner.nextInvokerId(), txnId, param, callback); + CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind, owner.nextCallbackId(), txnId, param, callback); owner.registerPending(kind, txnId, invoker); return invoker; } - final DefaultProgressLog owner; final boolean isHome; final long id; final TxnId txnId; @@ -57,7 +57,7 @@ class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext CallbackInvoker(DefaultProgressLog owner, TxnStateKind kind, long id, TxnId txnId, P param, Callback<P, V> callback) { - this.owner = owner; + super(owner); this.isHome = kind == Home; this.id = id; this.txnId = txnId; @@ -70,37 +70,32 @@ class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext return isHome ? Home : Waiting; } - @Override - public void accept(V success, Throwable fail) + private boolean complete() { - owner.commandStore.execute(this, safeStore -> { - - // we load safeCommand first so that if it clears the progress log we abandon the callback - SafeCommand safeCommand = safeStore.ifInitialised(txnId); - if (!owner.complete(safeStore, kind(), id, this)) - return; - - if (safeCommand == null) - return; - - callback.callback(safeStore, safeCommand, owner, txnId, param, success, fail); - }, owner.commandStore.agent()); + return owner.complete(kind(), id, txnId, this); } @Override - public boolean equals(Object obj) + public void accept(V success, Throwable fail) { - if (obj == null) return false; - if (obj.getClass() == TxnId.class) return txnId.equals(obj); - if (obj.getClass() != getClass()) return false; - CallbackInvoker<?, ?> that = (CallbackInvoker<?, ?>) obj; - return id == that.id && callback == that.callback; + owner.commandStore.execute(this, safeStore -> { + try + { + // we load safeCommand first so that if it clears the progress log we abandon the callback + SafeCommand safeCommand = safeStore.ifInitialised(txnId); + if (complete() && safeCommand != null) + acceptInternal(safeStore, safeCommand, success, fail); + } + finally + { + postRun(safeStore); + } + }, owner.commandStore.agent()); } - @Override - public int hashCode() + private void acceptInternal(SafeCommandStore safeStore, SafeCommand safeCommand, V success, Throwable fail) { - return txnId.hashCode(); + callback.callback(safeStore, safeCommand, owner, txnId, param, success, fail); } @Override diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index 450ff5fc..518a283f 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -18,6 +18,7 @@ package accord.impl.progresslog; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; @@ -28,6 +29,9 @@ import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import accord.api.ProgressLog; import accord.api.RoutingKey; import accord.local.Command; @@ -52,14 +56,12 @@ import accord.utils.btree.BTree; import accord.utils.btree.BTreeRemoval; import org.agrona.collections.Long2ObjectHashMap; import org.agrona.collections.Object2ObjectHashMap; -import org.agrona.collections.ObjectHashSet; import static accord.api.ProgressLog.BlockedUntil.CanApply; import static accord.api.ProgressLog.BlockedUntil.NotBlocked; import static accord.impl.progresslog.CoordinatePhase.Decided; import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute; import static accord.impl.progresslog.CoordinatePhase.Undecided; -import static accord.impl.progresslog.Progress.Awaiting; import static accord.impl.progresslog.Progress.NoneExpected; import static accord.impl.progresslog.Progress.Querying; import static accord.impl.progresslog.Progress.Queued; @@ -79,6 +81,29 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; // TODO (desired): evict to disk public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStore> { + static abstract class PendingTask + { + final DefaultProgressLog owner; + + PendingTask(DefaultProgressLog owner) + { + this.owner = owner; + } + + void postRun(SafeCommandStore safeStore) + { + owner.acceptIfNonEmptyRunBuffer(safeStore); + } + } + + public static class Config + { + public int concurrency = 8; + public Duration maxActiveRunTime = Duration.ofMinutes(1); + } + + private static final Logger logger = LoggerFactory.getLogger(DefaultProgressLog.class); + final Node node; final CommandStore commandStore; @@ -94,8 +119,9 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor * These callbacks are required to have hashCode() == txnId.hashCode() and equals(txnId) == true, * so that we can manage overriding callbacks on the relevant TxnState. */ - private final ObjectHashSet<Object> pendingWaiting = new ObjectHashSet<>(); - private final ObjectHashSet<Object> pendingHome = new ObjectHashSet<>(); + // TODO (desired): replace this with a set that can lookup the matching item + private final Object2ObjectHashMap<TxnId, PendingTask> pendingWaiting = new Object2ObjectHashMap<>(); + private final Object2ObjectHashMap<TxnId, PendingTask> pendingHome = new Object2ObjectHashMap<>(); private final Long2ObjectHashMap<Object> active = new Long2ObjectHashMap<>(); private final Map<TxnId, StackTraceElement[]> debugDeleted = Invariants.debug() ? new Object2ObjectHashMap<>() : null; @@ -113,9 +139,9 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor private boolean processing; private volatile boolean stopped; - private int maxConcurrency = 128; + private Config config = new Config(); - private long nextInvokerId; + private long nextCallbackId; protected DefaultProgressLog(Node node, CommandStore commandStore) { @@ -234,7 +260,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor // fall-through to default handler, which simply postpones any scheduled coordination attempt if we witness another coordination attempt in the meantime if (state.homeProgress() == Queued && (before == null ? after.promised().compareTo(Ballot.ZERO) > 0 : (after.promised().compareTo(before.promised()) > 0) || after.acceptedOrCommitted().compareTo(before.acceptedOrCommitted()) > 0)) { - clearPending(Home, state.txnId); + clearPendingAndActive(Home, state.txnId); state.home().set(safeStore, this, state.phase(), Queued); } break; @@ -494,6 +520,12 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor state.setInvalidIfUncommitted(); } + void acceptIfNonEmptyRunBuffer(SafeCommandStore safeStore) + { + if (runBufferIndex < runBufferCount) + accept(safeStore); + } + @Override public void accept(@Nullable SafeCommandStore safeStore) { @@ -592,6 +624,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor run.clearPendingTimerDelay(); if (pendingTimerDeadline <= nowMicros) { + validatePreRunState(run, runKind.other()); invokeBoth = true; } else @@ -640,7 +673,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor { while (runBufferIndex < runBufferCount) { - if (active.size() >= maxConcurrency) + if (active.size() >= config.concurrency) { maybeShrinkRunBuffer(); return; @@ -668,77 +701,60 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor private void validatePreRunState(TxnState run, TxnStateKind kind) { Progress progress = kind == Waiting ? run.waiting().waitingProgress() : run.home().homeProgress(); - Invariants.require(progress != NoneExpected && progress != Querying); + Invariants.require(progress != NoneExpected); + if (progress == Querying) + { + // TODO (expected): add debug information about the active task + logger.warn("Interrupting query for {} ({}) as fallback timeout exceeded", run.txnId, kind); + clearPendingAndActive(kind, run.txnId); + } } RunInvoker invoker(TxnState run, TxnStateKind runKind) { - RunInvoker invoker = new RunInvoker(nextInvokerId(), run, runKind); + RunInvoker invoker = new RunInvoker(this, run, runKind); registerPending(runKind, run.txnId, invoker); return invoker; } - class RunInvoker implements Consumer<SafeCommandStore>, PreLoadContext + static final class RunInvoker extends PendingTask implements PreLoadContext, Consumer<SafeCommandStore> { - final long id; + final DefaultProgressLog owner; final TxnState run; final TxnStateKind runKind; - RunInvoker(long id, TxnState run, TxnStateKind runKind) + RunInvoker(DefaultProgressLog owner, TxnState run, TxnStateKind runKind) { - this.id = id; + super(owner); + this.owner = owner; this.run = run; this.runKind = runKind; } - @Override - public void accept(SafeCommandStore safeStore) + private boolean complete() { - // we have to read safeCommand first as it may become truncated on load, which may clear the progress log and invalidate us - SafeCommand safeCommand = safeStore.ifInitialised(run.txnId); - if (safeCommand == null) - return; - - if (!complete(safeStore, runKind, id, this)) - return; // we've been cancelled - - // check this after fetching SafeCommand, as doing so can erase the command (and invalidate our state) - if (run.isDone(runKind)) - return; - - Invariants.require(get(run.txnId) == run, "Transaction state for %s does not match expected one %s", run.txnId, run); - Invariants.require(run.scheduledTimer() != runKind, "We are actively executing %s, but we are also scheduled to run this same TxnState later. This should not happen.", runKind); - Invariants.require(run.pendingTimer() != runKind, "We are actively executing %s, but we also have a pending scheduled task to run this same TxnState later. This should not happen.", runKind); - - validatePreRunState(run, runKind); - if (runKind == Home) - { - boolean isRetry = run.homeProgress() == Awaiting; - if (isRetry) run.incrementHomeRetryCounter(); - run.home().runHome(DefaultProgressLog.this, safeStore, safeCommand); - } - else - { - boolean isRetry = run.waitingProgress() == Awaiting; - if (isRetry) run.incrementWaitingRetryCounter(); - run.runWaiting(safeStore, safeCommand, DefaultProgressLog.this); - } + return owner.complete(runKind, run.txnId, this); } - @Override - public boolean equals(Object obj) + private void acceptInternal(SafeCommandStore safeStore, SafeCommand safeCommand) { - if (obj == null) return false; - if (obj.getClass() == TxnId.class) return run.txnId.equals(obj); - if (obj.getClass() != getClass()) return false; - RunInvoker that = (RunInvoker) obj; - return id == that.id && run.txnId.equals(that.run.txnId) && runKind.equals(that.runKind); + owner.run(runKind, run, safeStore, safeCommand); } @Override - public int hashCode() + public void accept(SafeCommandStore safeStore) { - return run.txnId.hashCode(); + try + { + // we load safeCommand first so that if it clears the progress log we abandon the callback + SafeCommand safeCommand = safeStore.ifInitialised(run.txnId); + if (complete() && safeCommand != null) + acceptInternal(safeStore, safeCommand); + } + finally + { + postRun(safeStore); + } } @Override @@ -754,45 +770,86 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor } } - long nextInvokerId() + protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore safeStore, SafeCommand safeCommand) + { + // check this after fetching SafeCommand, as doing so can erase the command (and invalidate our state) + if (run.isDone(runKind)) + return; + + Invariants.require(get(run.txnId) == run, "Transaction state for %s does not match expected one %s", run.txnId, run); + Invariants.require(run.scheduledTimer() != runKind, "We are actively executing %s, but we are also scheduled to run this same TxnState later. This should not happen.", runKind); + Invariants.require(run.pendingTimer() != runKind, "We are actively executing %s, but we also have a pending scheduled task to run this same TxnState later. This should not happen.", runKind); + + validatePreRunState(run, runKind); + if (runKind == Home) + { + boolean isRetry = run.homeProgress() != Queued; + if (isRetry) run.incrementHomeRetryCounter(); + run.runHome(DefaultProgressLog.this, safeStore, safeCommand); + } + else + { + boolean isRetry = run.waitingProgress() != Queued; + if (isRetry) run.incrementWaitingRetryCounter(); + run.runWaiting(DefaultProgressLog.this, safeStore, safeCommand); + } + } + + long nextCallbackId() { - return nextInvokerId++; + return ++nextCallbackId; } - ObjectHashSet<Object> pending(TxnStateKind kind) + Object2ObjectHashMap<TxnId, PendingTask> pending(TxnStateKind kind) { return kind == Waiting ? pendingWaiting : pendingHome; } - void registerPending(TxnStateKind kind, TxnId txnId, Object object) + void registerPending(TxnStateKind kind, TxnId txnId, PendingTask register) { - ObjectHashSet<Object> pending = pending(kind); - Invariants.require(!pending.contains(txnId)); - pending.add(object); + Object2ObjectHashMap<TxnId, PendingTask> collection = pending(kind); + PendingTask existing = collection.putIfAbsent(txnId, register); + Invariants.require(existing == null); } boolean hasPending(TxnStateKind kind, TxnId txnId) { - return pending(kind).contains(txnId); + return pending(kind).containsKey(txnId); } - void start(CallbackInvoker<?, ?> invoker, Object task) + void start(CallbackInvoker<?, ?> invoker, Object debug) { - active.put(invoker.id, task); + // task is an arbitrary object to help debug, but must be non-null + // TODO (expected): make active debuggable via virtual table or other mechanism + if (debug == null) + debug = invoker; + active.put(invoker.id, debug); } - boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id, Object active) + boolean complete(TxnStateKind kind, long id, TxnId txnId, PendingTask completing) { - this.active.remove(id); - boolean result = pending(kind).remove(active); - if (runBufferIndex < runBufferCount) - accept(safeStore); - return result; + boolean stillActive = active.remove(id) != null; + return complete(kind, txnId, completing) && stillActive; + } + + boolean complete(TxnStateKind kind, TxnId txnId, PendingTask completing) + { + return pending(kind).remove(txnId, completing); + } + + void clearPendingAndActive(TxnStateKind kind, TxnId txnId) + { + PendingTask pending = pending(kind).remove(txnId); + if (pending instanceof CallbackInvoker<?,?>) + active.remove(((CallbackInvoker<?, ?>) pending).id); } - void clearPending(TxnStateKind kind, TxnId txnId) + public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId txnId) { - pending(kind).remove(txnId); + clearPendingAndActive(kind, txnId); + TxnState state = get(txnId); + if (state != null && (kind == Home ? state.isHomeInitialised() : !state.isWaitingDone())) + state.updateScheduling(safeStore, this, kind, null, Queued); } void unschedule(TxnState state) @@ -826,15 +883,20 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor } } - public void setMaxConcurrency(int maxConcurrency) + public Config config() + { + return config; + } + + public void setConfig(SafeCommandStore safeStore, Config config) { - Invariants.requireArgument(maxConcurrency >= 1); - this.maxConcurrency = maxConcurrency; + Invariants.require(commandStore.inStore()); + this.config = config; } - public int maxConcurrency() + public void unsafeSetConfig(Config config) { - return maxConcurrency; + this.config = config; } public int size() diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java index 381207b2..9179dbaa 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java @@ -30,6 +30,8 @@ import accord.local.CommandStores.IncludingSpecificStoreSelector; import accord.local.SafeCommand; import accord.local.SafeCommandStore; import accord.primitives.ProgressToken; +import accord.primitives.Route; +import accord.primitives.Status; import accord.primitives.TxnId; import accord.utils.Invariants; @@ -37,7 +39,6 @@ import static accord.api.ProgressLog.BlockedUntil.CanCoordinateExecution; import static accord.api.TraceEventType.HOME_PROGRESS; import static accord.impl.progresslog.CallbackInvoker.invokeHomeCallback; import static accord.impl.progresslog.CoordinatePhase.Done; -import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute; import static accord.impl.progresslog.Progress.NoneExpected; import static accord.impl.progresslog.Progress.Querying; import static accord.impl.progresslog.Progress.Queued; @@ -129,7 +130,7 @@ abstract class HomeState extends WaitingState if (newPhase.compareTo(phase()) > 0) { - instance.clearPending(Home, txnId); + instance.clearPendingAndActive(Home, txnId); clearHomeRetryCounter(); set(safeStore, instance, newPhase, newProgress); } @@ -137,7 +138,7 @@ abstract class HomeState extends WaitingState final void runHome(DefaultProgressLog instance, SafeCommandStore safeStore, SafeCommand safeCommand) { - Tracing tracing = instance.node.agent().trace(safeCommand.txnId(), HOME_PROGRESS); + Tracing tracing = instance.node.agent().trace(txnId, HOME_PROGRESS); Invariants.require(!isHomeDoneOrUninitialised()); Command command = safeCommand.current(); // note: we may truncate locally based on shard-specific criteria, but this doesn't mean we're globally persisted @@ -146,6 +147,17 @@ abstract class HomeState extends WaitingState // TODO (expected): when invalidated, safer to maintain HomeState until known to be globally invalidated // TODO (expected): validate that we clear HomeState when we receive a Durable reply, to replace the token check logic Invariants.require(!command.durability().isDurableOrInvalidated(), "Command is durable or invalidated, but we have not cleared the ProgressLog"); + if (Route.isFullRoute(command.route())) + { + Status.Durability min = safeStore.durableBefore().min(txnId, command.route()); + if (min.isDurableOrInvalidated()) + { + if (tracing != null) + tracing.trace(safeStore.commandStore(), "DurableBefore records %s; terminating home state", min); + setHomeDone(instance); + return; + } + } ProgressToken maxProgressToken = instance.savedProgressToken(txnId).merge(command); CallbackInvoker<ProgressToken, Outcome> invoker = invokeHomeCallback(instance, txnId, maxProgressToken, HomeState::recoverCallback); @@ -225,7 +237,7 @@ abstract class HomeState extends WaitingState { set(null, instance, Done, NoneExpected); clearHomeRetryCounter(); - instance.clearPending(Home, txnId); + instance.clearPendingAndActive(Home, txnId); } void setHomeDoneAndMaybeRemove(DefaultProgressLog instance) diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java index 044a9327..97c0dc02 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java @@ -18,6 +18,7 @@ package accord.impl.progresslog; +import java.time.temporal.TemporalUnit; import javax.annotation.Nullable; import com.google.common.primitives.Ints; @@ -30,8 +31,9 @@ import accord.utils.Invariants; import accord.utils.UnhandledEnum; import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; -final class TxnState extends HomeState implements PreLoadContext +public final class TxnState extends HomeState implements PreLoadContext { TxnState(TxnId txnId) { @@ -46,9 +48,12 @@ final class TxnState extends HomeState implements PreLoadContext default: throw new UnhandledEnum(newProgress); case NoneExpected: - case Querying: newDelay = 0; break; + case Querying: + newDelay = NANOSECONDS.toMicros(instance.config().maxActiveRunTime.toNanos()); + Invariants.require(newDelay >= 0); + break; case Queued: switch (updated) { @@ -69,10 +74,7 @@ final class TxnState extends HomeState implements PreLoadContext } TxnStateKind scheduled = scheduledTimer(); - if (scheduled == null) - { - Invariants.require(pendingTimer() == null); - } + Invariants.require(scheduled != null || pendingTimer() == null); // previousDeadline is the previous deadline of <updated>; // otherDeadline is the active deadline (if any) of <updated.other()> diff --git a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java index e7eb0922..da23e910 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java +++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java @@ -19,7 +19,6 @@ package accord.impl.progresslog; import java.util.concurrent.Executor; -import java.util.function.BiConsumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -349,7 +348,7 @@ abstract class WaitingState extends BaseTxnState void setWaitingDone(DefaultProgressLog owner) { set(null, owner, CanApply, NoneExpected); - owner.clearPending(Waiting, txnId); + owner.clearPendingAndActive(Waiting, txnId); clearWaitingRetryCounter(); } @@ -360,7 +359,7 @@ abstract class WaitingState extends BaseTxnState { clearAwaitState(); clearWaitingRetryCounter(); - owner.clearPending(Waiting, txnId); + owner.clearPendingAndActive(Waiting, txnId); set(safeStore, owner, blockedUntil, Queued); } } @@ -374,11 +373,11 @@ abstract class WaitingState extends BaseTxnState set(null, owner, isDone ? CanApply : currentlyBlockedUntil, NoneExpected); if (isDone) maybeRemove(owner); - owner.clearPending(Waiting, txnId); + owner.clearPendingAndActive(Waiting, txnId); } } - final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand, DefaultProgressLog owner) + final void runWaiting(DefaultProgressLog owner, SafeCommandStore safeStore, SafeCommand safeCommand) { runInternal(safeStore, safeCommand, owner, owner.node.agent().trace(txnId, WAIT_PROGRESS)); } @@ -774,16 +773,16 @@ abstract class WaitingState extends BaseTxnState { // TODO (desired): fetch only the route // we MUSt allocate before calling withEpoch to register cancellation, as async - BiConsumer<Route<?>, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchRouteCallback); - FetchRoute.fetchRoute(owner.node(), txnId, contactable, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker); + CallbackInvoker<BlockedUntil, Route<?>> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchRouteCallback); + owner.start(invoker, FetchRoute.fetchRoute(owner.node(), txnId, contactable, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker)); } static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, Route<?> slicedRoute, Route<?> fetchRoute, Route<?> maxRoute) { Invariants.require(!slicedRoute.isEmpty()); // we MUSt allocate before calling withEpoch to register cancellation, as async - BiConsumer<FetchData.FetchResult, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback); - FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId, executeAt, fetchRoute, maxRoute, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker); + CallbackInvoker<BlockedUntil, FetchData.FetchResult> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback); + owner.start(invoker, FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId, executeAt, fetchRoute, maxRoute, new IncludingSpecificStoreSelector(owner.commandStore.id()), invoker)); } void awaitHomeKey(DefaultProgressLog owner, BlockedUntil blockedUntil, TxnId txnId, Timestamp executeAt, Route<?> route) @@ -803,10 +802,10 @@ abstract class WaitingState extends BaseTxnState { long epoch = blockedUntil.fetchEpoch(txnId, executeAt); // we MUST allocate the invoker before invoking withEpoch as this may be asynchronous and we must first register our callback for cancellation - BiConsumer<AsynchronousAwait.SynchronousResult, Throwable> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, callback); - owner.node().withEpochAtLeast(epoch, (Executor)null, invoker, () -> { + CallbackInvoker<BlockedUntil, AsynchronousAwait.SynchronousResult> invoker = invokeWaitingCallback(owner, txnId, blockedUntil, callback); + owner.start(invoker, owner.node().withEpochAtLeast(epoch, (Executor)null, invoker, () -> { AsynchronousAwait.awaitAny(owner.node(), contact(owner, route, epoch), txnId, route, blockedUntil, callbackId, invoker); - }); + })); } String toStateString() diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index dc19879d..971d76aa 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -64,6 +64,7 @@ import accord.coordinate.CoordinationAdapter.Factory.Kind; import accord.coordinate.Infer.InvalidIf; import accord.coordinate.Outcome; import accord.coordinate.RecoverWithRoute; +import accord.local.CommandStores.LatentStoreSelector; import accord.local.CommandStores.StoreSelector; import accord.local.durability.DurabilityService; import accord.messages.Callback; @@ -428,19 +429,20 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ } } - public void withEpochAtLeast(long epoch, @Nullable Executor ifAsync, BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess) + public Object withEpochAtLeast(long epoch, @Nullable Executor ifAsync, BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess) { if (topology.hasAtLeastEpoch(epoch)) { ifSuccess.run(); + return ifSuccess; } else { - topology.awaitEpoch(epoch, ifAsync).begin((success, fail) -> { + configService.fetchTopologyForEpoch(epoch); + return topology.awaitEpoch(epoch, ifAsync).begin((success, fail) -> { if (fail != null) ifFailure.accept(null, fail); else ifSuccess.run(); }); - configService.fetchTopologyForEpoch(epoch); } } @@ -891,7 +893,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ } } - public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf invalidIf, FullRoute<?> route, StoreSelector reportTo, @Nullable Tracing tracing) + public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf invalidIf, FullRoute<?> route, LatentStoreSelector reportTo, @Nullable Tracing tracing) { { AsyncResult<? extends Outcome> result = coordinating.get(txnId); diff --git a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java index 4e062cf0..31368832 100644 --- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java +++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java @@ -119,7 +119,7 @@ public class DurabilityQueue { ++pendingCounter; pending.add(new Pending(syncPoint, request, attempt)); - if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter > prunedAt + pending.size()) + if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter > prunedAt) prune(); } } @@ -191,7 +191,8 @@ public class DurabilityQueue private synchronized void prune() { - prunedAt = pendingCounter; + prunedAt = pending.size(); + pendingCounter = 0; List<SortForPruning> sorted = new ArrayList<>(); for (Pending p : pending) { diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index 3bc9ae49..e8bfcd1a 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -726,7 +726,9 @@ public class Cluster Runnable updateProgressLogConcurrency; { updateProgressLogConcurrency = () -> { - nodeMap.values().forEach(node -> node.commandStores().forEachCommandStore(cs -> ((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(8, 32)))); + nodeMap.values().forEach(node -> node.commandStores().forEachCommandStore(cs -> cs.execute(() -> { + ((TestProgressLog)cs.unsafeProgressLog()).config().concurrency = random.nextInt(8, 32); + }))); }; } updateProgressLogConcurrency.run(); diff --git a/accord-core/src/test/java/accord/impl/basic/Pending.java b/accord-core/src/test/java/accord/impl/basic/Pending.java index 7214d5bc..6474af87 100644 --- a/accord-core/src/test/java/accord/impl/basic/Pending.java +++ b/accord-core/src/test/java/accord/impl/basic/Pending.java @@ -37,6 +37,11 @@ public interface Pending Invariants.require(activeOrigin != null); } + public static void unsafeSetActiveOrigin(Pending newActiveOrigin) + { + activeOrigin = newActiveOrigin; + } + public static void clearActiveOrigin() { activeOrigin = null; diff --git a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java index 56e76f00..f03bebc3 100644 --- a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java +++ b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java @@ -18,18 +18,23 @@ package accord.impl.basic; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import accord.api.ProgressLog; import accord.impl.progresslog.DefaultProgressLog; +import accord.impl.progresslog.TxnState; +import accord.impl.progresslog.TxnStateKind; import accord.local.CommandStore; import accord.local.Node; +import accord.local.SafeCommand; import accord.local.SafeCommandStore; public class TestProgressLogs implements ProgressLog.Factory { static class TestProgressLog extends DefaultProgressLog { + private static final RecurringPendingRunnable RECURRING = new RecurringPendingRunnable(-1, null, null, null, null, true){}; protected TestProgressLog(Node node, CommandStore commandStore) { super(node, commandStore); @@ -46,12 +51,30 @@ public class TestProgressLogs implements ProgressLog.Factory } finally { - if (prev != Pending.Global.NONE) + Pending.Global.unsafeSetActiveOrigin(prev); + } + } + + @Override + protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore safeStore, SafeCommand safeCommand) + { + if (run.txnId.isSystemTxn()) + { + Pending prev = Pending.Global.activeOrigin(); + Pending.Global.unsafeSetActiveOrigin(RECURRING); + try + { + super.run(runKind, run, safeStore, safeCommand); + } + finally { - Pending.Global.clearActiveOrigin(); - Pending.Global.setActiveOrigin(prev); + Pending.Global.unsafeSetActiveOrigin(prev); } } + else + { + super.run(runKind, run, safeStore, safeCommand); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
