This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk-tmp2
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk-tmp2 by this push:
new 98ad4f0e Improve ProgressLog: - Schedule a fallback timeout for
active requests to ensure progress with lost callbacks - Clear active task for
a txnId when new RunInvoker is registered - Consult DurableBefore prior to
invoking recovery - Support user invoked reset of a command, clearing any
active work and requeueing it - (Testing): Treat progress log for RX as
recurring tasks for burn test termination Also fix: - Handle another
truncateWithOutcome edge case
98ad4f0e is described below
commit 98ad4f0e588040e9a1932476258331569f6d35d9
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jul 28 17:18:33 2025 +0100
Improve ProgressLog:
- Schedule a fallback timeout for active requests to ensure progress with
lost callbacks
- Clear active task for a txnId when new RunInvoker is registered
- Consult DurableBefore prior to invoking recovery
- Support user invoked reset of a command, clearing any active work and
requeueing it
- (Testing): Treat progress log for RX as recurring tasks for burn test
termination
Also fix:
- Handle another truncateWithOutcome edge case
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20806
---
.../src/main/java/accord/coordinate/FetchData.java | 25 ++-
.../main/java/accord/coordinate/FetchRoute.java | 10 +-
.../main/java/accord/coordinate/MaybeRecover.java | 8 +-
.../accord/impl/progresslog/CallbackInvoker.java | 49 +++--
.../impl/progresslog/DefaultProgressLog.java | 216 +++++++++++++--------
.../java/accord/impl/progresslog/HomeState.java | 20 +-
.../java/accord/impl/progresslog/TxnState.java | 14 +-
.../java/accord/impl/progresslog/WaitingState.java | 23 ++-
.../src/main/java/accord/local/Cleanup.java | 9 +-
accord-core/src/main/java/accord/local/Node.java | 10 +-
.../accord/local/durability/DurabilityQueue.java | 5 +-
.../src/test/java/accord/impl/basic/Cluster.java | 4 +-
.../src/test/java/accord/impl/basic/Pending.java | 5 +
.../java/accord/impl/basic/TestProgressLogs.java | 29 ++-
14 files changed, 264 insertions(+), 163 deletions(-)
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index dc75c4fe..9b3374a5 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -73,11 +73,11 @@ public class FetchData extends CheckShards<Route<?>>
final long srcEpoch;
// known participants, a subset of which we may fetch from
final Participants<?> contactable;
- final StoreSelector reportTo;
+ final LatentStoreSelector reportTo;
final BiConsumer<? super FetchResult, Throwable> callback;
final @Nullable Tracing tracing;
- public FetchRequest(SequentialAsyncExecutor executor, Known fetch,
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt,
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super
FetchResult, Throwable> callback, @Nullable Tracing tracing)
+ public FetchRequest(SequentialAsyncExecutor executor, Known fetch,
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt,
Participants<?> contactable, LatentStoreSelector reportTo, BiConsumer<? super
FetchResult, Throwable> callback, @Nullable Tracing tracing)
{
this.executor = executor;
this.fetch = fetch;
@@ -95,29 +95,26 @@ public class FetchData extends CheckShards<Route<?>>
/**
* Do not make an attempt to discern what keys need to be contacted; fetch
from only the specific remote keys that were requested.
*/
- public static void fetchSpecific(Known fetch, Node node, TxnId txnId,
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute, StoreSelector
reportTo, BiConsumer<? super FetchResult, Throwable> callback)
+ public static Object fetchSpecific(Known fetch, Node node, TxnId txnId,
@Nullable Timestamp executeAt, Route<?> query, Route<?> maxRoute,
LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable>
callback)
{
- fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid, executeAt,
query, maxRoute, reportTo, callback);
+ return fetchSpecific(fetch, node, txnId, NotKnownToBeInvalid,
executeAt, query, maxRoute, reportTo, callback);
}
/**
* Do not make an attempt to discern what keys need to be contacted; fetch
from only the specific remote keys that were requested.
*/
- public static void fetchSpecific(Known fetch, Node node, TxnId txnId,
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?>
maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable>
callback)
+ public static Object fetchSpecific(Known fetch, Node node, TxnId txnId,
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?>
maxRoute, LatentStoreSelector reportTo, BiConsumer<? super FetchResult,
Throwable> callback)
{
- fetchSpecific(node, query, maxRoute, new
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt,
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
+ return fetchSpecific(node, query, maxRoute, new
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt,
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
}
- public static void fetchSpecific(Node node, Route<?> query, Route<?>
maxRoute, FetchRequest request)
+ public static Object fetchSpecific(Node node, Route<?> query, Route<?>
maxRoute, FetchRequest request)
{
long srcEpoch = request.srcEpoch;
if (!node.topology().hasAtLeastEpoch(srcEpoch))
- {
- node.withEpochAtLeast(srcEpoch, request.executor,
request.callback, () -> fetchSpecific(node, query, maxRoute, request));
- return;
- }
+ return node.withEpochAtLeast(srcEpoch, request.executor,
request.callback, () -> fetchSpecific(node, query, maxRoute, request));
- fetchData(node, query, maxRoute, request);
+ return fetchData(node, query, maxRoute, request);
}
final BiConsumer<? super FetchResult, Throwable> callback;
@@ -131,12 +128,12 @@ public class FetchData extends CheckShards<Route<?>>
// (i.e. if preaccept/accept contact a later epoch than execution is
decided for)
final LatentStoreSelector reportTo;
- private FetchData(Node node, Known target, TxnId txnId, InvalidIf
invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch, StoreSelector
reportTo, BiConsumer<? super FetchResult, Throwable> callback)
+ private FetchData(Node node, Known target, TxnId txnId, InvalidIf
invalidIf, Route<?> route, Route<?> maxRoute, long sourceEpoch,
LatentStoreSelector reportTo, BiConsumer<? super FetchResult, Throwable>
callback)
{
this(node, target, txnId, invalidIf, route, route.withHomeKey(),
maxRoute, sourceEpoch, reportTo, callback);
}
- private FetchData(Node node, Known target, TxnId txnId, InvalidIf
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long
sourceEpoch, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable>
callback)
+ private FetchData(Node node, Known target, TxnId txnId, InvalidIf
invalidIf, Route<?> route, Route<?> routeWithHomeKey, Route<?> maxRoute, long
sourceEpoch, LatentStoreSelector reportTo, BiConsumer<? super FetchResult,
Throwable> callback)
{
// TODO (desired, efficiency): restore behaviour of only collecting
info if e.g. Committed or Executed
super(node, node.someSequentialExecutor(), txnId, routeWithHomeKey,
sourceEpoch, CheckStatus.IncludeInfo.All, null, invalidIf);
diff --git a/accord-core/src/main/java/accord/coordinate/FetchRoute.java
b/accord-core/src/main/java/accord/coordinate/FetchRoute.java
index 7a4f149b..0f03b75e 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchRoute.java
@@ -59,23 +59,23 @@ public class FetchRoute extends CheckShards<Participants<?>>
this.callback = callback;
}
- public static void fetchRoute(Node node, TxnId txnId, Infer.InvalidIf
invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo,
BiConsumer<Route<?>, Throwable> callback, Tracing tracing)
+ public static Object fetchRoute(Node node, TxnId txnId, Infer.InvalidIf
invalidIf, Participants<?> unseekables, LatentStoreSelector reportTo,
BiConsumer<Route<?>, Throwable> callback, Tracing tracing)
{
if (!node.topology().hasEpoch(txnId.epoch()))
{
if (tracing != null)
tracing.trace(null, "Waiting for epoch %d", txnId.epoch());
- node.withEpochAtLeast(txnId.epoch(), null, callback, () ->
fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing));
- return;
+ return node.withEpochAtLeast(txnId.epoch(), null, callback, () ->
fetchRoute(node, txnId, invalidIf, unseekables, reportTo, callback, tracing));
}
FetchRoute fetchRoute = new FetchRoute(node, txnId, invalidIf,
unseekables, reportTo, callback, tracing);
fetchRoute.start();
+ return fetchRoute;
}
- public static void fetchRoute(Node node, TxnId txnId, Participants<?>
contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable>
callback)
+ public static Object fetchRoute(Node node, TxnId txnId, Participants<?>
contactable, LatentStoreSelector reportTo, BiConsumer<Route<?>, Throwable>
callback)
{
- fetchRoute(node, txnId, NotKnownToBeInvalid, contactable, reportTo,
callback, node.agent().trace(txnId, FETCH));
+ return fetchRoute(node, txnId, NotKnownToBeInvalid, contactable,
reportTo, callback, node.agent().trace(txnId, FETCH));
}
@Override
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 5c1fc3bf..0a04519a 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -21,6 +21,8 @@ package accord.coordinate;
import java.util.function.BiConsumer;
import accord.api.Tracing;
+import accord.local.CommandStores;
+import accord.local.CommandStores.LatentStoreSelector;
import accord.local.CommandStores.StoreSelector;
import accord.local.SequentialAsyncExecutor;
import accord.messages.InformDurable;
@@ -45,9 +47,9 @@ public class MaybeRecover extends CheckShards<Route<?>>
{
final ProgressToken prevProgress;
final BiConsumer<Outcome, Throwable> callback;
- final StoreSelector reportTo;
+ final LatentStoreSelector reportTo;
- MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId,
Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress,
StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
+ MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId,
Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress,
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
{
// we only want to enquire with the home shard, but we prefer maximal
route information for running Invalidation against, if necessary
super(node, executor, txnId, someRoute.withHomeKey(),
IncludeInfo.Route, null, invalidIf, node.agent().trace(txnId, RECOVER));
@@ -56,7 +58,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
this.reportTo = reportTo;
}
- public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf
invalidIf, Route<?> someRoute, ProgressToken prevProgress, StoreSelector
reportTo, BiConsumer<Outcome, Throwable> callback)
+ public static Object maybeRecover(Node node, TxnId txnId, Infer.InvalidIf
invalidIf, Route<?> someRoute, ProgressToken prevProgress, LatentStoreSelector
reportTo, BiConsumer<Outcome, Throwable> callback)
{
MaybeRecover maybeRecover = new MaybeRecover(node,
node.someSequentialExecutor(), txnId, invalidIf, someRoute, prevProgress,
reportTo, callback);
maybeRecover.start();
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
index 4667846b..80d50fb9 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/CallbackInvoker.java
@@ -24,12 +24,13 @@ import javax.annotation.Nullable;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
import accord.primitives.TxnId;
import static accord.impl.progresslog.TxnStateKind.Home;
import static accord.impl.progresslog.TxnStateKind.Waiting;
-class CallbackInvoker<P, V> implements BiConsumer<V, Throwable>, PreLoadContext
+final class CallbackInvoker<P, V> extends DefaultProgressLog.PendingTask
implements BiConsumer<V, Throwable>, PreLoadContext
{
static <P, V> CallbackInvoker<P, V>
invokeWaitingCallback(DefaultProgressLog instance, TxnId txnId, P param,
Callback<P, V> callback)
{
@@ -43,12 +44,11 @@ class CallbackInvoker<P, V> implements BiConsumer<V,
Throwable>, PreLoadContext
static <P, V> CallbackInvoker<P, V> invokeCallback(TxnStateKind kind,
DefaultProgressLog owner, TxnId txnId, P param, Callback<P, V> callback)
{
- CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind,
owner.nextInvokerId(), txnId, param, callback);
+ CallbackInvoker<P, V> invoker = new CallbackInvoker<>(owner, kind,
owner.nextCallbackId(), txnId, param, callback);
owner.registerPending(kind, txnId, invoker);
return invoker;
}
- final DefaultProgressLog owner;
final boolean isHome;
final long id;
final TxnId txnId;
@@ -57,7 +57,7 @@ class CallbackInvoker<P, V> implements BiConsumer<V,
Throwable>, PreLoadContext
CallbackInvoker(DefaultProgressLog owner, TxnStateKind kind, long id,
TxnId txnId, P param, Callback<P, V> callback)
{
- this.owner = owner;
+ super(owner);
this.isHome = kind == Home;
this.id = id;
this.txnId = txnId;
@@ -70,37 +70,32 @@ class CallbackInvoker<P, V> implements BiConsumer<V,
Throwable>, PreLoadContext
return isHome ? Home : Waiting;
}
- @Override
- public void accept(V success, Throwable fail)
+ private boolean complete()
{
- owner.commandStore.execute(this, safeStore -> {
-
- // we load safeCommand first so that if it clears the progress log
we abandon the callback
- SafeCommand safeCommand = safeStore.ifInitialised(txnId);
- if (!owner.complete(safeStore, kind(), id, this))
- return;
-
- if (safeCommand == null)
- return;
-
- callback.callback(safeStore, safeCommand, owner, txnId, param,
success, fail);
- }, owner.commandStore.agent());
+ return owner.complete(kind(), id, txnId, this);
}
@Override
- public boolean equals(Object obj)
+ public void accept(V success, Throwable fail)
{
- if (obj == null) return false;
- if (obj.getClass() == TxnId.class) return txnId.equals(obj);
- if (obj.getClass() != getClass()) return false;
- CallbackInvoker<?, ?> that = (CallbackInvoker<?, ?>) obj;
- return id == that.id && callback == that.callback;
+ owner.commandStore.execute(this, safeStore -> {
+ try
+ {
+ // we load safeCommand first so that if it clears the progress
log we abandon the callback
+ SafeCommand safeCommand = safeStore.ifInitialised(txnId);
+ if (complete() && safeCommand != null)
+ acceptInternal(safeStore, safeCommand, success, fail);
+ }
+ finally
+ {
+ postRun(safeStore);
+ }
+ }, owner.commandStore.agent());
}
- @Override
- public int hashCode()
+ private void acceptInternal(SafeCommandStore safeStore, SafeCommand
safeCommand, V success, Throwable fail)
{
- return txnId.hashCode();
+ callback.callback(safeStore, safeCommand, owner, txnId, param,
success, fail);
}
@Override
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 450ff5fc..518a283f 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -18,6 +18,7 @@
package accord.impl.progresslog;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -28,6 +29,9 @@ import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.local.Command;
@@ -52,14 +56,12 @@ import accord.utils.btree.BTree;
import accord.utils.btree.BTreeRemoval;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.Object2ObjectHashMap;
-import org.agrona.collections.ObjectHashSet;
import static accord.api.ProgressLog.BlockedUntil.CanApply;
import static accord.api.ProgressLog.BlockedUntil.NotBlocked;
import static accord.impl.progresslog.CoordinatePhase.Decided;
import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
import static accord.impl.progresslog.CoordinatePhase.Undecided;
-import static accord.impl.progresslog.Progress.Awaiting;
import static accord.impl.progresslog.Progress.NoneExpected;
import static accord.impl.progresslog.Progress.Querying;
import static accord.impl.progresslog.Progress.Queued;
@@ -79,6 +81,29 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
// TODO (desired): evict to disk
public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStore>
{
+ static abstract class PendingTask
+ {
+ final DefaultProgressLog owner;
+
+ PendingTask(DefaultProgressLog owner)
+ {
+ this.owner = owner;
+ }
+
+ void postRun(SafeCommandStore safeStore)
+ {
+ owner.acceptIfNonEmptyRunBuffer(safeStore);
+ }
+ }
+
+ public static class Config
+ {
+ public int concurrency = 8;
+ public Duration maxActiveRunTime = Duration.ofMinutes(1);
+ }
+
+ private static final Logger logger =
LoggerFactory.getLogger(DefaultProgressLog.class);
+
final Node node;
final CommandStore commandStore;
@@ -94,8 +119,9 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
* These callbacks are required to have hashCode() == txnId.hashCode() and
equals(txnId) == true,
* so that we can manage overriding callbacks on the relevant TxnState.
*/
- private final ObjectHashSet<Object> pendingWaiting = new ObjectHashSet<>();
- private final ObjectHashSet<Object> pendingHome = new ObjectHashSet<>();
+ // TODO (desired): replace this with a set that can lookup the matching
item
+ private final Object2ObjectHashMap<TxnId, PendingTask> pendingWaiting =
new Object2ObjectHashMap<>();
+ private final Object2ObjectHashMap<TxnId, PendingTask> pendingHome = new
Object2ObjectHashMap<>();
private final Long2ObjectHashMap<Object> active = new
Long2ObjectHashMap<>();
private final Map<TxnId, StackTraceElement[]> debugDeleted =
Invariants.debug() ? new Object2ObjectHashMap<>() : null;
@@ -113,9 +139,9 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
private boolean processing;
private volatile boolean stopped;
- private int maxConcurrency = 128;
+ private Config config = new Config();
- private long nextInvokerId;
+ private long nextCallbackId;
protected DefaultProgressLog(Node node, CommandStore commandStore)
{
@@ -234,7 +260,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
// fall-through to default handler, which simply postpones any
scheduled coordination attempt if we witness another coordination attempt in
the meantime
if (state.homeProgress() == Queued && (before == null ?
after.promised().compareTo(Ballot.ZERO) > 0 :
(after.promised().compareTo(before.promised()) > 0) ||
after.acceptedOrCommitted().compareTo(before.acceptedOrCommitted()) > 0))
{
- clearPending(Home, state.txnId);
+ clearPendingAndActive(Home, state.txnId);
state.home().set(safeStore, this, state.phase(), Queued);
}
break;
@@ -494,6 +520,12 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
state.setInvalidIfUncommitted();
}
+ void acceptIfNonEmptyRunBuffer(SafeCommandStore safeStore)
+ {
+ if (runBufferIndex < runBufferCount)
+ accept(safeStore);
+ }
+
@Override
public void accept(@Nullable SafeCommandStore safeStore)
{
@@ -592,6 +624,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
run.clearPendingTimerDelay();
if (pendingTimerDeadline <= nowMicros)
{
+ validatePreRunState(run, runKind.other());
invokeBoth = true;
}
else
@@ -640,7 +673,7 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
{
while (runBufferIndex < runBufferCount)
{
- if (active.size() >= maxConcurrency)
+ if (active.size() >= config.concurrency)
{
maybeShrinkRunBuffer();
return;
@@ -668,77 +701,60 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
private void validatePreRunState(TxnState run, TxnStateKind kind)
{
Progress progress = kind == Waiting ? run.waiting().waitingProgress()
: run.home().homeProgress();
- Invariants.require(progress != NoneExpected && progress != Querying);
+ Invariants.require(progress != NoneExpected);
+ if (progress == Querying)
+ {
+ // TODO (expected): add debug information about the active task
+ logger.warn("Interrupting query for {} ({}) as fallback timeout
exceeded", run.txnId, kind);
+ clearPendingAndActive(kind, run.txnId);
+ }
}
RunInvoker invoker(TxnState run, TxnStateKind runKind)
{
- RunInvoker invoker = new RunInvoker(nextInvokerId(), run, runKind);
+ RunInvoker invoker = new RunInvoker(this, run, runKind);
registerPending(runKind, run.txnId, invoker);
return invoker;
}
- class RunInvoker implements Consumer<SafeCommandStore>, PreLoadContext
+ static final class RunInvoker extends PendingTask implements
PreLoadContext, Consumer<SafeCommandStore>
{
- final long id;
+ final DefaultProgressLog owner;
final TxnState run;
final TxnStateKind runKind;
- RunInvoker(long id, TxnState run, TxnStateKind runKind)
+ RunInvoker(DefaultProgressLog owner, TxnState run, TxnStateKind
runKind)
{
- this.id = id;
+ super(owner);
+ this.owner = owner;
this.run = run;
this.runKind = runKind;
}
- @Override
- public void accept(SafeCommandStore safeStore)
+ private boolean complete()
{
- // we have to read safeCommand first as it may become truncated on
load, which may clear the progress log and invalidate us
- SafeCommand safeCommand = safeStore.ifInitialised(run.txnId);
- if (safeCommand == null)
- return;
-
- if (!complete(safeStore, runKind, id, this))
- return; // we've been cancelled
-
- // check this after fetching SafeCommand, as doing so can erase
the command (and invalidate our state)
- if (run.isDone(runKind))
- return;
-
- Invariants.require(get(run.txnId) == run, "Transaction state for
%s does not match expected one %s", run.txnId, run);
- Invariants.require(run.scheduledTimer() != runKind, "We are
actively executing %s, but we are also scheduled to run this same TxnState
later. This should not happen.", runKind);
- Invariants.require(run.pendingTimer() != runKind, "We are actively
executing %s, but we also have a pending scheduled task to run this same
TxnState later. This should not happen.", runKind);
-
- validatePreRunState(run, runKind);
- if (runKind == Home)
- {
- boolean isRetry = run.homeProgress() == Awaiting;
- if (isRetry) run.incrementHomeRetryCounter();
- run.home().runHome(DefaultProgressLog.this, safeStore,
safeCommand);
- }
- else
- {
- boolean isRetry = run.waitingProgress() == Awaiting;
- if (isRetry) run.incrementWaitingRetryCounter();
- run.runWaiting(safeStore, safeCommand,
DefaultProgressLog.this);
- }
+ return owner.complete(runKind, run.txnId, this);
}
- @Override
- public boolean equals(Object obj)
+ private void acceptInternal(SafeCommandStore safeStore, SafeCommand
safeCommand)
{
- if (obj == null) return false;
- if (obj.getClass() == TxnId.class) return run.txnId.equals(obj);
- if (obj.getClass() != getClass()) return false;
- RunInvoker that = (RunInvoker) obj;
- return id == that.id && run.txnId.equals(that.run.txnId) &&
runKind.equals(that.runKind);
+ owner.run(runKind, run, safeStore, safeCommand);
}
@Override
- public int hashCode()
+ public void accept(SafeCommandStore safeStore)
{
- return run.txnId.hashCode();
+ try
+ {
+ // we load safeCommand first so that if it clears the progress
log we abandon the callback
+ SafeCommand safeCommand = safeStore.ifInitialised(run.txnId);
+ if (complete() && safeCommand != null)
+ acceptInternal(safeStore, safeCommand);
+ }
+ finally
+ {
+ postRun(safeStore);
+ }
}
@Override
@@ -754,45 +770,86 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
}
}
- long nextInvokerId()
+ protected void run(TxnStateKind runKind, TxnState run, SafeCommandStore
safeStore, SafeCommand safeCommand)
+ {
+ // check this after fetching SafeCommand, as doing so can erase the
command (and invalidate our state)
+ if (run.isDone(runKind))
+ return;
+
+ Invariants.require(get(run.txnId) == run, "Transaction state for %s
does not match expected one %s", run.txnId, run);
+ Invariants.require(run.scheduledTimer() != runKind, "We are actively
executing %s, but we are also scheduled to run this same TxnState later. This
should not happen.", runKind);
+ Invariants.require(run.pendingTimer() != runKind, "We are actively
executing %s, but we also have a pending scheduled task to run this same
TxnState later. This should not happen.", runKind);
+
+ validatePreRunState(run, runKind);
+ if (runKind == Home)
+ {
+ boolean isRetry = run.homeProgress() != Queued;
+ if (isRetry) run.incrementHomeRetryCounter();
+ run.runHome(DefaultProgressLog.this, safeStore, safeCommand);
+ }
+ else
+ {
+ boolean isRetry = run.waitingProgress() != Queued;
+ if (isRetry) run.incrementWaitingRetryCounter();
+ run.runWaiting(DefaultProgressLog.this, safeStore, safeCommand);
+ }
+ }
+
+ long nextCallbackId()
{
- return nextInvokerId++;
+ return ++nextCallbackId;
}
- ObjectHashSet<Object> pending(TxnStateKind kind)
+ Object2ObjectHashMap<TxnId, PendingTask> pending(TxnStateKind kind)
{
return kind == Waiting ? pendingWaiting : pendingHome;
}
- void registerPending(TxnStateKind kind, TxnId txnId, Object object)
+ void registerPending(TxnStateKind kind, TxnId txnId, PendingTask register)
{
- ObjectHashSet<Object> pending = pending(kind);
- Invariants.require(!pending.contains(txnId));
- pending.add(object);
+ Object2ObjectHashMap<TxnId, PendingTask> collection = pending(kind);
+ PendingTask existing = collection.putIfAbsent(txnId, register);
+ Invariants.require(existing == null);
}
boolean hasPending(TxnStateKind kind, TxnId txnId)
{
- return pending(kind).contains(txnId);
+ return pending(kind).containsKey(txnId);
}
- void start(CallbackInvoker<?, ?> invoker, Object task)
+ void start(CallbackInvoker<?, ?> invoker, Object debug)
{
- active.put(invoker.id, task);
+ // task is an arbitrary object to help debug, but must be non-null
+ // TODO (expected): make active debuggable via virtual table or other
mechanism
+ if (debug == null)
+ debug = invoker;
+ active.put(invoker.id, debug);
}
- boolean complete(SafeCommandStore safeStore, TxnStateKind kind, long id,
Object active)
+ boolean complete(TxnStateKind kind, long id, TxnId txnId, PendingTask
completing)
{
- this.active.remove(id);
- boolean result = pending(kind).remove(active);
- if (runBufferIndex < runBufferCount)
- accept(safeStore);
- return result;
+ boolean stillActive = active.remove(id) != null;
+ return complete(kind, txnId, completing) && stillActive;
+ }
+
+ boolean complete(TxnStateKind kind, TxnId txnId, PendingTask completing)
+ {
+ return pending(kind).remove(txnId, completing);
+ }
+
+ void clearPendingAndActive(TxnStateKind kind, TxnId txnId)
+ {
+ PendingTask pending = pending(kind).remove(txnId);
+ if (pending instanceof CallbackInvoker<?,?>)
+ active.remove(((CallbackInvoker<?, ?>) pending).id);
}
- void clearPending(TxnStateKind kind, TxnId txnId)
+ public void requeue(SafeCommandStore safeStore, TxnStateKind kind, TxnId
txnId)
{
- pending(kind).remove(txnId);
+ clearPendingAndActive(kind, txnId);
+ TxnState state = get(txnId);
+ if (state != null && (kind == Home ? state.isHomeInitialised() :
!state.isWaitingDone()))
+ state.updateScheduling(safeStore, this, kind, null, Queued);
}
void unschedule(TxnState state)
@@ -826,15 +883,20 @@ public class DefaultProgressLog implements ProgressLog,
Consumer<SafeCommandStor
}
}
- public void setMaxConcurrency(int maxConcurrency)
+ public Config config()
+ {
+ return config;
+ }
+
+ public void setConfig(SafeCommandStore safeStore, Config config)
{
- Invariants.requireArgument(maxConcurrency >= 1);
- this.maxConcurrency = maxConcurrency;
+ Invariants.require(commandStore.inStore());
+ this.config = config;
}
- public int maxConcurrency()
+ public void unsafeSetConfig(Config config)
{
- return maxConcurrency;
+ this.config = config;
}
public int size()
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 381207b2..9179dbaa 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -30,6 +30,8 @@ import
accord.local.CommandStores.IncludingSpecificStoreSelector;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
import accord.primitives.ProgressToken;
+import accord.primitives.Route;
+import accord.primitives.Status;
import accord.primitives.TxnId;
import accord.utils.Invariants;
@@ -37,7 +39,6 @@ import static
accord.api.ProgressLog.BlockedUntil.CanCoordinateExecution;
import static accord.api.TraceEventType.HOME_PROGRESS;
import static accord.impl.progresslog.CallbackInvoker.invokeHomeCallback;
import static accord.impl.progresslog.CoordinatePhase.Done;
-import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
import static accord.impl.progresslog.Progress.NoneExpected;
import static accord.impl.progresslog.Progress.Querying;
import static accord.impl.progresslog.Progress.Queued;
@@ -129,7 +130,7 @@ abstract class HomeState extends WaitingState
if (newPhase.compareTo(phase()) > 0)
{
- instance.clearPending(Home, txnId);
+ instance.clearPendingAndActive(Home, txnId);
clearHomeRetryCounter();
set(safeStore, instance, newPhase, newProgress);
}
@@ -137,7 +138,7 @@ abstract class HomeState extends WaitingState
final void runHome(DefaultProgressLog instance, SafeCommandStore
safeStore, SafeCommand safeCommand)
{
- Tracing tracing = instance.node.agent().trace(safeCommand.txnId(),
HOME_PROGRESS);
+ Tracing tracing = instance.node.agent().trace(txnId, HOME_PROGRESS);
Invariants.require(!isHomeDoneOrUninitialised());
Command command = safeCommand.current();
// note: we may truncate locally based on shard-specific criteria, but
this doesn't mean we're globally persisted
@@ -146,6 +147,17 @@ abstract class HomeState extends WaitingState
// TODO (expected): when invalidated, safer to maintain HomeState
until known to be globally invalidated
// TODO (expected): validate that we clear HomeState when we receive a
Durable reply, to replace the token check logic
Invariants.require(!command.durability().isDurableOrInvalidated(),
"Command is durable or invalidated, but we have not cleared the ProgressLog");
+ if (Route.isFullRoute(command.route()))
+ {
+ Status.Durability min = safeStore.durableBefore().min(txnId,
command.route());
+ if (min.isDurableOrInvalidated())
+ {
+ if (tracing != null)
+ tracing.trace(safeStore.commandStore(), "DurableBefore
records %s; terminating home state", min);
+ setHomeDone(instance);
+ return;
+ }
+ }
ProgressToken maxProgressToken =
instance.savedProgressToken(txnId).merge(command);
CallbackInvoker<ProgressToken, Outcome> invoker =
invokeHomeCallback(instance, txnId, maxProgressToken,
HomeState::recoverCallback);
@@ -225,7 +237,7 @@ abstract class HomeState extends WaitingState
{
set(null, instance, Done, NoneExpected);
clearHomeRetryCounter();
- instance.clearPending(Home, txnId);
+ instance.clearPendingAndActive(Home, txnId);
}
void setHomeDoneAndMaybeRemove(DefaultProgressLog instance)
diff --git a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
index 044a9327..97c0dc02 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/TxnState.java
@@ -18,6 +18,7 @@
package accord.impl.progresslog;
+import java.time.temporal.TemporalUnit;
import javax.annotation.Nullable;
import com.google.common.primitives.Ints;
@@ -30,8 +31,9 @@ import accord.utils.Invariants;
import accord.utils.UnhandledEnum;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
-final class TxnState extends HomeState implements PreLoadContext
+public final class TxnState extends HomeState implements PreLoadContext
{
TxnState(TxnId txnId)
{
@@ -46,9 +48,12 @@ final class TxnState extends HomeState implements
PreLoadContext
default:
throw new UnhandledEnum(newProgress);
case NoneExpected:
- case Querying:
newDelay = 0;
break;
+ case Querying:
+ newDelay =
NANOSECONDS.toMicros(instance.config().maxActiveRunTime.toNanos());
+ Invariants.require(newDelay >= 0);
+ break;
case Queued:
switch (updated)
{
@@ -69,10 +74,7 @@ final class TxnState extends HomeState implements
PreLoadContext
}
TxnStateKind scheduled = scheduledTimer();
- if (scheduled == null)
- {
- Invariants.require(pendingTimer() == null);
- }
+ Invariants.require(scheduled != null || pendingTimer() == null);
// previousDeadline is the previous deadline of <updated>;
// otherDeadline is the active deadline (if any) of <updated.other()>
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index e7eb0922..da23e910 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -19,7 +19,6 @@
package accord.impl.progresslog;
import java.util.concurrent.Executor;
-import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -349,7 +348,7 @@ abstract class WaitingState extends BaseTxnState
void setWaitingDone(DefaultProgressLog owner)
{
set(null, owner, CanApply, NoneExpected);
- owner.clearPending(Waiting, txnId);
+ owner.clearPendingAndActive(Waiting, txnId);
clearWaitingRetryCounter();
}
@@ -360,7 +359,7 @@ abstract class WaitingState extends BaseTxnState
{
clearAwaitState();
clearWaitingRetryCounter();
- owner.clearPending(Waiting, txnId);
+ owner.clearPendingAndActive(Waiting, txnId);
set(safeStore, owner, blockedUntil, Queued);
}
}
@@ -374,11 +373,11 @@ abstract class WaitingState extends BaseTxnState
set(null, owner, isDone ? CanApply : currentlyBlockedUntil,
NoneExpected);
if (isDone)
maybeRemove(owner);
- owner.clearPending(Waiting, txnId);
+ owner.clearPendingAndActive(Waiting, txnId);
}
}
- final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand,
DefaultProgressLog owner)
+ final void runWaiting(DefaultProgressLog owner, SafeCommandStore
safeStore, SafeCommand safeCommand)
{
runInternal(safeStore, safeCommand, owner,
owner.node.agent().trace(txnId, WAIT_PROGRESS));
}
@@ -774,16 +773,16 @@ abstract class WaitingState extends BaseTxnState
{
// TODO (desired): fetch only the route
// we MUSt allocate before calling withEpoch to register cancellation,
as async
- BiConsumer<Route<?>, Throwable> invoker = invokeWaitingCallback(owner,
txnId, blockedUntil, WaitingState::fetchRouteCallback);
- FetchRoute.fetchRoute(owner.node(), txnId, contactable, new
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
+ CallbackInvoker<BlockedUntil, Route<?>> invoker =
invokeWaitingCallback(owner, txnId, blockedUntil,
WaitingState::fetchRouteCallback);
+ owner.start(invoker, FetchRoute.fetchRoute(owner.node(), txnId,
contactable, new IncludingSpecificStoreSelector(owner.commandStore.id()),
invoker));
}
static void fetch(DefaultProgressLog owner, BlockedUntil blockedUntil,
TxnId txnId, Timestamp executeAt, Route<?> slicedRoute, Route<?> fetchRoute,
Route<?> maxRoute)
{
Invariants.require(!slicedRoute.isEmpty());
// we MUSt allocate before calling withEpoch to register cancellation,
as async
- BiConsumer<FetchData.FetchResult, Throwable> invoker =
invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback);
- FetchData.fetchSpecific(blockedUntil.unblockedFrom.known,
owner.node(), txnId, executeAt, fetchRoute, maxRoute, new
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker);
+ CallbackInvoker<BlockedUntil, FetchData.FetchResult> invoker =
invokeWaitingCallback(owner, txnId, blockedUntil, WaitingState::fetchCallback);
+ owner.start(invoker,
FetchData.fetchSpecific(blockedUntil.unblockedFrom.known, owner.node(), txnId,
executeAt, fetchRoute, maxRoute, new
IncludingSpecificStoreSelector(owner.commandStore.id()), invoker));
}
void awaitHomeKey(DefaultProgressLog owner, BlockedUntil blockedUntil,
TxnId txnId, Timestamp executeAt, Route<?> route)
@@ -803,10 +802,10 @@ abstract class WaitingState extends BaseTxnState
{
long epoch = blockedUntil.fetchEpoch(txnId, executeAt);
// we MUST allocate the invoker before invoking withEpoch as this may
be asynchronous and we must first register our callback for cancellation
- BiConsumer<AsynchronousAwait.SynchronousResult, Throwable> invoker =
invokeWaitingCallback(owner, txnId, blockedUntil, callback);
- owner.node().withEpochAtLeast(epoch, (Executor)null, invoker, () -> {
+ CallbackInvoker<BlockedUntil, AsynchronousAwait.SynchronousResult>
invoker = invokeWaitingCallback(owner, txnId, blockedUntil, callback);
+ owner.start(invoker, owner.node().withEpochAtLeast(epoch,
(Executor)null, invoker, () -> {
AsynchronousAwait.awaitAny(owner.node(), contact(owner, route,
epoch), txnId, route, blockedUntil, callbackId, invoker);
- });
+ }));
}
String toStateString()
diff --git a/accord-core/src/main/java/accord/local/Cleanup.java
b/accord-core/src/main/java/accord/local/Cleanup.java
index 6d1c3be4..b8cfb582 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -40,7 +40,6 @@ import static
accord.local.RedundantStatus.Property.LOCALLY_DEFUNCT;
import static
accord.local.RedundantStatus.Property.LOCALLY_DURABLE_TO_DATA_STORE;
import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
import static accord.local.RedundantStatus.Property.NOT_OWNED;
-import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP_OR_STALE;
import static accord.local.RedundantStatus.Property.SHARD_APPLIED;
import static accord.local.RedundantStatus.Property.TRUNCATE_BEFORE;
import static accord.primitives.Known.KnownExecuteAt.ApplyAtKnown;
@@ -199,7 +198,7 @@ public enum Cleanup
Invariants.paranoid(redundant.all(SHARD_APPLIED));
if (!redundant.all(LOCALLY_DURABLE_TO_DATA_STORE))
- return truncateWithOutcome(txnId, redundant, participants, min);
+ return truncateWithOutcome(txnId, input, redundant, min);
if (saveStatus.compareTo(Vestigial) >= 0)
{
@@ -223,7 +222,7 @@ public enum Cleanup
// (if the condition is false and we fall through to
removing Outcome)
case MajorityOrInvalidated:
case Majority:
- return truncateWithOutcome(txnId, redundant, participants,
min);
+ return truncateWithOutcome(txnId, input, redundant, min);
case UniversalOrInvalidated:
case Universal:
@@ -307,9 +306,9 @@ public enum Cleanup
return INVALIDATE;
}
- private static Cleanup truncateWithOutcome(TxnId txnId, RedundantStatus
status, StoreParticipants participants, Cleanup atLeast)
+ private static Cleanup truncateWithOutcome(TxnId txnId, Input input,
RedundantStatus status, Cleanup atLeast)
{
- return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast :
(participants.executes() == null || !participants.stillExecutes().isEmpty()) &&
!status.all(PRE_BOOTSTRAP_OR_STALE)
+ return atLeast.compareTo(TRUNCATE_WITH_OUTCOME) > 0 ? atLeast : input
== PARTIAL || !status.all(LOCALLY_DEFUNCT)
?
TRUNCATE_WITH_OUTCOME : TRUNCATE;
}
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index dc19879d..971d76aa 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -64,6 +64,7 @@ import accord.coordinate.CoordinationAdapter.Factory.Kind;
import accord.coordinate.Infer.InvalidIf;
import accord.coordinate.Outcome;
import accord.coordinate.RecoverWithRoute;
+import accord.local.CommandStores.LatentStoreSelector;
import accord.local.CommandStores.StoreSelector;
import accord.local.durability.DurabilityService;
import accord.messages.Callback;
@@ -428,19 +429,20 @@ public class Node implements
ConfigurationService.Listener, NodeCommandStoreServ
}
}
- public void withEpochAtLeast(long epoch, @Nullable Executor ifAsync,
BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess)
+ public Object withEpochAtLeast(long epoch, @Nullable Executor ifAsync,
BiConsumer<?, ? super Throwable> ifFailure, Runnable ifSuccess)
{
if (topology.hasAtLeastEpoch(epoch))
{
ifSuccess.run();
+ return ifSuccess;
}
else
{
- topology.awaitEpoch(epoch, ifAsync).begin((success, fail) -> {
+ configService.fetchTopologyForEpoch(epoch);
+ return topology.awaitEpoch(epoch, ifAsync).begin((success, fail)
-> {
if (fail != null) ifFailure.accept(null, fail);
else ifSuccess.run();
});
- configService.fetchTopologyForEpoch(epoch);
}
}
@@ -891,7 +893,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
}
}
- public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf
invalidIf, FullRoute<?> route, StoreSelector reportTo, @Nullable Tracing
tracing)
+ public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf
invalidIf, FullRoute<?> route, LatentStoreSelector reportTo, @Nullable Tracing
tracing)
{
{
AsyncResult<? extends Outcome> result = coordinating.get(txnId);
diff --git
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 4e062cf0..31368832 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -119,7 +119,7 @@ public class DurabilityQueue
{
++pendingCounter;
pending.add(new Pending(syncPoint, request, attempt));
- if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter >
prunedAt + pending.size())
+ if (pending.size() >= PRUNE_SIZE_THRESHOLD && pendingCounter >
prunedAt)
prune();
}
}
@@ -191,7 +191,8 @@ public class DurabilityQueue
private synchronized void prune()
{
- prunedAt = pendingCounter;
+ prunedAt = pending.size();
+ pendingCounter = 0;
List<SortForPruning> sorted = new ArrayList<>();
for (Pending p : pending)
{
diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java
b/accord-core/src/test/java/accord/impl/basic/Cluster.java
index 3bc9ae49..e8bfcd1a 100644
--- a/accord-core/src/test/java/accord/impl/basic/Cluster.java
+++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java
@@ -726,7 +726,9 @@ public class Cluster
Runnable updateProgressLogConcurrency;
{
updateProgressLogConcurrency = () -> {
- nodeMap.values().forEach(node ->
node.commandStores().forEachCommandStore(cs ->
((TestProgressLog)cs.unsafeProgressLog()).setMaxConcurrency(random.nextInt(8,
32))));
+ nodeMap.values().forEach(node ->
node.commandStores().forEachCommandStore(cs -> cs.execute(() -> {
+
((TestProgressLog)cs.unsafeProgressLog()).config().concurrency =
random.nextInt(8, 32);
+ })));
};
}
updateProgressLogConcurrency.run();
diff --git a/accord-core/src/test/java/accord/impl/basic/Pending.java
b/accord-core/src/test/java/accord/impl/basic/Pending.java
index 7214d5bc..6474af87 100644
--- a/accord-core/src/test/java/accord/impl/basic/Pending.java
+++ b/accord-core/src/test/java/accord/impl/basic/Pending.java
@@ -37,6 +37,11 @@ public interface Pending
Invariants.require(activeOrigin != null);
}
+ public static void unsafeSetActiveOrigin(Pending newActiveOrigin)
+ {
+ activeOrigin = newActiveOrigin;
+ }
+
public static void clearActiveOrigin()
{
activeOrigin = null;
diff --git a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
index 56e76f00..f03bebc3 100644
--- a/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
+++ b/accord-core/src/test/java/accord/impl/basic/TestProgressLogs.java
@@ -18,18 +18,23 @@
package accord.impl.basic;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import accord.api.ProgressLog;
import accord.impl.progresslog.DefaultProgressLog;
+import accord.impl.progresslog.TxnState;
+import accord.impl.progresslog.TxnStateKind;
import accord.local.CommandStore;
import accord.local.Node;
+import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
public class TestProgressLogs implements ProgressLog.Factory
{
static class TestProgressLog extends DefaultProgressLog
{
+ private static final RecurringPendingRunnable RECURRING = new
RecurringPendingRunnable(-1, null, null, null, null, true){};
protected TestProgressLog(Node node, CommandStore commandStore)
{
super(node, commandStore);
@@ -46,12 +51,30 @@ public class TestProgressLogs implements ProgressLog.Factory
}
finally
{
- if (prev != Pending.Global.NONE)
+ Pending.Global.unsafeSetActiveOrigin(prev);
+ }
+ }
+
+ @Override
+ protected void run(TxnStateKind runKind, TxnState run,
SafeCommandStore safeStore, SafeCommand safeCommand)
+ {
+ if (run.txnId.isSystemTxn())
+ {
+ Pending prev = Pending.Global.activeOrigin();
+ Pending.Global.unsafeSetActiveOrigin(RECURRING);
+ try
+ {
+ super.run(runKind, run, safeStore, safeCommand);
+ }
+ finally
{
- Pending.Global.clearActiveOrigin();
- Pending.Global.setActiveOrigin(prev);
+ Pending.Global.unsafeSetActiveOrigin(prev);
}
}
+ else
+ {
+ super.run(runKind, run, safeStore, safeCommand);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]