This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c203efd Fix minDecidedId calculation Also fix: - Bad cast of
Truncated during replay - Don't throw IllegalStateException in
Invariants.expect if accord.testing == false Also improve: - Support tracing
of recovery and home progress
1c203efd is described below
commit 1c203efdd463f2c452e8072267370f9e8dfdb8f3
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Jul 15 20:29:54 2025 +0100
Fix minDecidedId calculation
Also fix:
- Bad cast of Truncated during replay
- Don't throw IllegalStateException in Invariants.expect if accord.testing
== false
Also improve:
- Support tracing of recovery and home progress
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20771
---
.../src/main/java/accord/api/TraceEventType.java | 2 +-
accord-core/src/main/java/accord/api/Tracing.java | 7 ++
.../main/java/accord/coordinate/CheckShards.java | 7 +-
.../src/main/java/accord/coordinate/FetchData.java | 2 +-
.../main/java/accord/coordinate/Invalidate.java | 5 +-
.../main/java/accord/coordinate/MaybeRecover.java | 41 ++++++--
.../src/main/java/accord/coordinate/Recover.java | 103 +++++++++++++++++----
.../java/accord/coordinate/RecoverWithRoute.java | 78 +++++++++++++---
.../src/main/java/accord/impl/AbstractLoader.java | 3 +-
.../java/accord/impl/progresslog/HomeState.java | 34 ++++++-
.../java/accord/impl/progresslog/WaitingState.java | 8 +-
.../src/main/java/accord/local/MaxDecidedRX.java | 95 ++++++++++++++++---
accord-core/src/main/java/accord/local/Node.java | 5 +-
.../src/main/java/accord/messages/Propagate.java | 4 +-
.../main/java/accord/primitives/AbstractKeys.java | 1 -
.../src/main/java/accord/primitives/KeyRoute.java | 2 -
.../src/main/java/accord/primitives/TxnId.java | 10 ++
.../java/accord/utils/BTreeReducingRangeMap.java | 31 -------
.../src/main/java/accord/utils/Invariants.java | 30 +++---
.../src/test/java/accord/impl/list/ListRead.java | 3 +
20 files changed, 351 insertions(+), 120 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/TraceEventType.java
b/accord-core/src/main/java/accord/api/TraceEventType.java
index c47453db..22afbfac 100644
--- a/accord-core/src/main/java/accord/api/TraceEventType.java
+++ b/accord-core/src/main/java/accord/api/TraceEventType.java
@@ -19,5 +19,5 @@ package accord.api;
public enum TraceEventType
{
- FETCH, PROGRESS
+ FETCH, HOME_PROGRESS, WAIT_PROGRESS, RECOVER
}
diff --git a/accord-core/src/main/java/accord/api/Tracing.java
b/accord-core/src/main/java/accord/api/Tracing.java
index 06d01120..718bec59 100644
--- a/accord-core/src/main/java/accord/api/Tracing.java
+++ b/accord-core/src/main/java/accord/api/Tracing.java
@@ -32,4 +32,11 @@ public interface Tracing
catch (Throwable t) { message = "Could not format \"" + fmt + "\" with
" + Arrays.toString(args) + " (" + t.getLocalizedMessage() + ")"; }
trace(store, message);
}
+
+ static String format(Throwable failure)
+ {
+ StackTraceElement[] ste = failure.getStackTrace();
+ return failure.getClass().getSimpleName() + ":" +
failure.getLocalizedMessage()
+ + (ste.length > 0 ? " (@" + ste[0].getClassName() + "." +
ste[0].getMethodName() + ":" + ste[0].getLineNumber() + ")" : "");
+ }
}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 91d743c9..3e096e2f 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -60,7 +60,11 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId
txnId, U query, IncludeInfo includeInfo, @Nullable Ballot bumpBallot,
Infer.InvalidIf previouslyKnownToBeInvalidIf)
{
this(node, executor, txnId, query, txnId.epoch(), includeInfo,
bumpBallot, previouslyKnownToBeInvalidIf);
- Invariants.require(txnId.isVisible());
+ }
+
+ protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId
txnId, U query, IncludeInfo includeInfo, @Nullable Ballot bumpBallot,
Infer.InvalidIf previouslyKnownToBeInvalidIf, @Nullable Tracing tracing)
+ {
+ this(node, executor, txnId, query, txnId.epoch(), includeInfo,
bumpBallot, previouslyKnownToBeInvalidIf, tracing);
}
protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId
txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot
bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
@@ -77,6 +81,7 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
this.bumpBallot = bumpBallot;
this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf;
this.tracing = tracing;
+ Invariants.require(txnId.isVisible());
}
private static Topologies topologyFor(Node node, TxnId txnId,
Unseekables<?> contact, long epoch)
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index de2762bb..dc75c4fe 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -193,7 +193,7 @@ public class FetchData extends CheckShards<Route<?>>
if (failure != null)
{
if (tracing != null)
- tracing.trace(null, "%s completed with failure %s",
getClass().getSimpleName(), failure);
+ tracing.trace(null, "%s completed with failure %s",
getClass().getSimpleName(), Tracing.format(failure));
callback.accept(null, failure);
}
else
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java
b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index 7c04be1c..b2f0137e 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -20,6 +20,7 @@ package accord.coordinate;
import java.util.function.BiConsumer;
+import accord.api.TraceEventType;
import accord.coordinate.tracking.InvalidationTracker;
import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker;
import accord.coordinate.tracking.RequestStatus;
@@ -42,6 +43,7 @@ import accord.utils.UnhandledEnum;
import javax.annotation.Nullable;
+import static accord.api.TraceEventType.RECOVER;
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
import static accord.coordinate.Propose.NotAccept.proposeInvalidate;
import static accord.primitives.Status.AcceptedMedium;
@@ -219,7 +221,8 @@ public class Invalidate implements Callback<InvalidateReply>
if (!invalidateWith.containsAll(fullRoute))
witnessedByInvalidation = null;
}
- RecoverWithRoute.recover(node, executor, txnId,
NotKnownToBeInvalid, fullRoute, witnessedByInvalidation, reportTo, callback);
+ RecoverWithRoute.recover(node, executor, txnId,
NotKnownToBeInvalid, fullRoute, witnessedByInvalidation, reportTo, callback,
+ node.agent().trace(txnId,
RECOVER));
return;
case Invalidated:
diff --git a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
index 922c671b..755e5d16 100644
--- a/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
+++ b/accord-core/src/main/java/accord/coordinate/MaybeRecover.java
@@ -20,6 +20,7 @@ package accord.coordinate;
import java.util.function.BiConsumer;
+import accord.api.Tracing;
import accord.local.CommandStores.StoreSelector;
import accord.local.SequentialAsyncExecutor;
import accord.messages.InformDurable;
@@ -31,6 +32,7 @@ import accord.messages.CheckStatus.CheckStatusOk;
import accord.messages.CheckStatus.IncludeInfo;
import accord.utils.UnhandledEnum;
+import static accord.api.TraceEventType.RECOVER;
import static
accord.coordinate.Infer.InvalidateAndCallback.locallyInvalidateAndCallback;
import static accord.messages.Commit.Invalidate.commitInvalidate;
import static accord.primitives.WithQuorum.HasQuorum;
@@ -48,7 +50,7 @@ public class MaybeRecover extends CheckShards<Route<?>>
MaybeRecover(Node node, SequentialAsyncExecutor executor, TxnId txnId,
Infer.InvalidIf invalidIf, Route<?> someRoute, ProgressToken prevProgress,
StoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
{
// we only want to enquire with the home shard, but we prefer maximal
route information for running Invalidation against, if necessary
- super(node, executor, txnId, someRoute.withHomeKey(),
IncludeInfo.Route, null, invalidIf);
+ super(node, executor, txnId, someRoute.withHomeKey(),
IncludeInfo.Route, null, invalidIf, node.agent().trace(txnId, RECOVER));
this.prevProgress = prevProgress;
this.callback = callback;
this.reportTo = reportTo;
@@ -82,12 +84,17 @@ public class MaybeRecover extends CheckShards<Route<?>>
// this can be helpful in mitigating flakiness and helping forward
progress for large transactions spanning many shards
if (fail != null)
{
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover failed: " +
Tracing.format(fail));
callback.accept(null, fail);
}
else
{
Invariants.require(merged != null);
CheckStatusOk full = merged.finish(this.query, this.query,
this.query, success.withQuorum, previouslyKnownToBeInvalidIf);
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover merged: " + full);
+
Known known = full.maxKnown();
Route<?> someRoute = full.route;
@@ -95,6 +102,8 @@ public class MaybeRecover extends CheckShards<Route<?>>
{
default: throw new UnhandledEnum(known.outcome());
case Unknown:
+ {
+
// ErasedOrInvalidated takes Unknown, and so permits
invalidation to be initiated.
// This might prima facie seem unsafe, as
ErasedOrInvalidated might mean the command
// has been executed and erased, in which case it is not
safe to invalidate.
@@ -106,37 +115,57 @@ public class MaybeRecover extends CheckShards<Route<?>>
// TODO (expected): replicas may be stale in this case,
and should detect this and stop attempting to coordinate/invalidate.
if (success.withQuorum == HasQuorum &&
known.canProposeInvalidation() && !Route.isFullRoute(full.route))
{
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover found quorum
permitting invalidation: " + known);
+
// for correctness reasons, we have not necessarily
preempted the initial pre-accept round and
// may have raced with it, so we must attempt to
recover anything we see pre-accepted.
Invalidate.invalidate(node, txnId, someRoute,
callback);
break;
}
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover found quorum of
Unknown that did not permit invalidation; falling through.");
// fall through otherwise to recovery
-
+ }
case Apply:
+ {
// we have included the home key, and one that witnessed
the definition has responded, so it should also know the full route
if (hasMadeProgress(full) || !Route.isFullRoute(someRoute))
{
+ ProgressToken progressToken = full.toProgressToken();
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover found %s;
reporting progress token %s", hasMadeProgress(full) ? "progress" : "no route",
progressToken);
if (full.durability.isDurable())
InformDurable.informDefault(node, topologies,
txnId, query, full.executeAtIfKnown(), full.durability);
callback.accept(full.toProgressToken(), null);
}
else
{
- node.recover(txnId, full.invalidIf,
Route.castToFullRoute(someRoute), reportTo).invoke(callback);
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover invoking
RecoverWithRoute");
+ node.recover(txnId, full.invalidIf,
Route.castToFullRoute(someRoute), reportTo, tracing).begin(callback);
}
break;
-
+ }
case WasApply:
case Erased:
+ {
// TODO (required): if we're home replica, don't want to
cancel coordination without either invalidating or applying unless we're stale
- callback.accept(full.toProgressToken(), null);
+ ProgressToken progressToken = full.toProgressToken();
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover found %s which
cannot be recovered; reporting %s", known.outcome(), progressToken);
+ callback.accept(progressToken, null);
break;
-
+ }
case Abort:
+ {
+ if (tracing != null)
+ tracing.trace(null, "MaybeRecover found Abort;
invalidating locally", known.outcome());
+
commitInvalidate(node, txnId, Route.merge(full.route,
(Route) query), txnId.epoch());
locallyInvalidateAndCallback(node, txnId, txnId.epoch(),
txnId.epoch(), someRoute, full.toProgressToken(), callback, null);
break;
+ }
}
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java
b/accord-core/src/main/java/accord/coordinate/Recover.java
index 398b5080..b0be4183 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -30,6 +30,8 @@ import javax.annotation.Nullable;
import accord.api.ProgressLog.BlockedUntil;
import accord.api.Result;
import accord.api.RoutingKey;
+import accord.api.TraceEventType;
+import accord.api.Tracing;
import accord.coordinate.ExecuteFlag.CoordinationFlags;
import accord.coordinate.tracking.RecoveryTracker;
import accord.local.CommandStores.LatentStoreSelector;
@@ -114,6 +116,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
private final @Nullable Timestamp committedExecuteAt;
private final LatentStoreSelector reportTo;
private final BiConsumer<Outcome, Throwable> callback;
+ private final @Nullable Tracing tracing;
private boolean isDone;
private SortedListMap<Id, RecoverOk> recoverOks;
@@ -122,7 +125,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
private Recover(Node node, SequentialAsyncExecutor executor, Topologies
topologies, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route,
@Nullable Timestamp committedExecuteAt,
LatentStoreSelector reportTo,
- BiConsumer<Outcome, Throwable> callback)
+ BiConsumer<Outcome, Throwable> callback, @Nullable Tracing
tracing)
{
Invariants.require(txnId.isVisible());
this.adapter = node.coordinationAdapter(txnId, Recovery);
@@ -135,6 +138,7 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
this.committedExecuteAt = committedExecuteAt;
this.reportTo = reportTo;
this.callback = callback;
+ this.tracing = tracing;
this.tracker = new RecoveryTracker(topologies);
this.recoverOks = new SortedListMap<>(topologies.nodes(),
RecoverOk[]::new);
}
@@ -151,7 +155,10 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
}
else if (failure instanceof Redundant)
{
- retry(((Redundant) failure).committedExecuteAt);
+ Timestamp committedExecuteAt = ((Redundant)
failure).committedExecuteAt;
+ if (tracing != null)
+ tracing.trace(null, "Recover found Redundant; retrying with
known committedExecuteAt " + committedExecuteAt);
+ retry(committedExecuteAt);
}
else
{
@@ -167,41 +174,41 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
node.agent().onRecover(node, result, failure);
}
- public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
+ public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, @Nullable Tracing
tracing)
{
- return recover(node, txnId, txn, route,
LatentStoreSelector.standard(), callback);
+ return recover(node, txnId, txn, route,
LatentStoreSelector.standard(), callback, tracing);
}
- public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome,
Throwable> callback)
+ public static Recover recover(Node node, TxnId txnId, Txn txn,
FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome,
Throwable> callback, @Nullable Tracing tracing)
{
Ballot ballot = node.uniqueTimestamp(Ballot::fromValues);
- return recover(node, ballot, txnId, txn, route, reportTo, callback);
+ return recover(node, ballot, txnId, txn, route, reportTo, callback,
tracing);
}
- private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback)
+ private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, @Nullable
Tracing tracing)
{
- return recover(node, ballot, txnId, txn, route, null, null, callback);
+ return recover(node, ballot, txnId, txn, route, null, null, callback,
tracing);
}
- private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome,
Throwable> callback)
+ private static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, LatentStoreSelector reportTo, BiConsumer<Outcome,
Throwable> callback, @Nullable Tracing tracing)
{
- return recover(node, ballot, txnId, txn, route, null, reportTo,
callback);
+ return recover(node, ballot, txnId, txn, route, null, reportTo,
callback, tracing);
}
- public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, @Nullable Timestamp executeAt, BiConsumer<Outcome,
Throwable> callback)
+ public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, @Nullable Timestamp executeAt, BiConsumer<Outcome,
Throwable> callback, @Nullable Tracing tracing)
{
- return recover(node, ballot, txnId, txn, route, executeAt, null,
callback);
+ return recover(node, ballot, txnId, txn, route, executeAt, null,
callback, tracing);
}
- public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, @Nullable Timestamp executeAt, LatentStoreSelector
reportTo, BiConsumer<Outcome, Throwable> callback)
+ public static Recover recover(Node node, Ballot ballot, TxnId txnId, Txn
txn, FullRoute<?> route, @Nullable Timestamp executeAt, LatentStoreSelector
reportTo, BiConsumer<Outcome, Throwable> callback, @Nullable Tracing tracing)
{
Topologies topologies = node.topology().select(route, txnId, executeAt
== null ? txnId : executeAt, SHARE, QuorumEpochIntersections.recover);
- return recover(node, topologies, ballot, txnId, txn, route, executeAt,
reportTo, callback);
+ return recover(node, topologies, ballot, txnId, txn, route, executeAt,
reportTo, callback, tracing);
}
- private static Recover recover(Node node, Topologies topologies, Ballot
ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt,
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
+ private static Recover recover(Node node, Topologies topologies, Ballot
ballot, TxnId txnId, Txn txn, FullRoute<?> route, Timestamp executeAt,
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback,
@Nullable Tracing tracing)
{
- Recover recover = new Recover(node, node.someSequentialExecutor(),
topologies, ballot, txnId, txn, route, executeAt, reportTo, callback);
+ Recover recover = new Recover(node, node.someSequentialExecutor(),
topologies, ballot, txnId, txn, route, executeAt, reportTo, callback, tracing);
recover.start(topologies.nodes());
return recover;
}
@@ -217,6 +224,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
if (isDone || isBallotPromised)
return;
+ if (tracing != null)
+ tracing.trace(null, "Recover received from %s: %s", from, reply);
+
boolean acceptsFastPath;
switch (reply.kind())
{
@@ -275,12 +285,18 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Truncated: throw illegalState("Truncate should be
filtered");
case Invalidated:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found Invalidated:
committing to all shards.");
+
commitInvalidate(invalidateUntil(recoverOks));
return;
}
case AcceptedInvalidate:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found AcceptedInvalidate:
continuing Invalidate.");
+
invalidate(recoverOks);
return;
}
@@ -299,6 +315,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Applied:
case PreApplied:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found Applied;
persisting.");
+
withStableDeps(merge, executeAt, (i, t) ->
node.agent().acceptAndWrap(i, t), stableDeps -> {
adapter.persist(node, executor,
tracker.topologies(), route, ballot, CoordinationFlags.none(), txnId, txn,
executeAt, stableDeps, acceptOrCommitNotTruncated.writes,
acceptOrCommitNotTruncated.result, (i, t) -> node.agent().acceptAndWrap(i, t));
});
@@ -308,6 +327,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case Stable:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found Stable;
executing.");
+
withStableDeps(merge, executeAt, this, stableDeps -> {
adapter.execute(node, executor,
tracker.topologies(), route, ballot, RECOVER, CoordinationFlags.none(), txnId,
txn, executeAt, stableDeps, stableDeps, this);
});
@@ -317,6 +339,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case PreCommitted:
case Committed:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found Committed;
stabilising.");
+
withCommittedDeps(merge, executeAt, this,
committedDeps -> {
adapter.stabilise(node, executor,
tracker.topologies(), route, ballot, txnId, txn, executeAt, committedDeps,
this);
});
@@ -326,6 +351,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
case AcceptedSlow:
case AcceptedMedium:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found Accepted;
re-proposing.");
+
// TODO (desired): if we have a quorum of Accept with
matching ballot or proposal we can go straight to Commit
// TODO (desired): if we didn't find Accepted in
*every* shard, consider invalidating for consistency of behaviour
// however, note that we may have taken the fast
path and recovered, so we can only do this if acceptedOrCommitted=Ballot.ZERO
@@ -353,6 +381,8 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
}
if (allShardsTruncated)
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found all shards
truncated; terminating.");
// TODO (required, correctness): this is not a safe
inference in the case of an ErasedOrInvalidOrVestigial response.
// We need to tighten up the inference and spread of
truncation/invalid outcomes.
// In this case, at minimum this can lead to liveness
violations as the home shard stops coordinating
@@ -384,6 +414,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
{
case Reject:
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found fast path rejection;
invoking Invalidate.");
+
invalidate(recoverOks);
return;
}
@@ -417,15 +450,36 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
switch (inferred)
{
default: throw new
UnhandledEnum(inferred);
- case Accept: propose(SLOW,
txnId, recoverOkList); break;
- case Unknown:
retry(committedExecuteAt); break;
- case Reject:
invalidate(recoverOks); break;
+ case Accept:
+ {
+ if (tracing != null)
+ tracing.trace(null,
"Recover found accepted fast path; proposing.");
+ propose(SLOW, txnId,
recoverOkList);
+ break;
+ }
+ case Unknown:
+ {
+ if (tracing != null)
+ tracing.trace(null,
"Recover found unknown fast path decision; retrying.");
+
retry(committedExecuteAt);
+ break;
+ }
+ case Reject:
+ {
+ if (tracing != null)
+ tracing.trace(null,
"Recover found fast path rejection; invoking Invalidate.");
+
+ invalidate(recoverOks);
+ break;
+ }
}
}
});
}
else
{
+ if (tracing != null)
+ tracing.trace(null, "Recover found unknown fast path
decision, but no preceding or superseding transactions awaiting decisions;
proposing.");
propose(SLOW, txnId, recoverOkList);
}
}
@@ -551,11 +605,17 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
Topologies topologies = tracker.topologies();
if (executeAt != null && executeAt.epoch() != (this.committedExecuteAt
== null ? txnId : this.committedExecuteAt).epoch())
topologies = node.topology().select(route, txnId, executeAt,
SHARE, QuorumEpochIntersections.recover);
- Recover.recover(node, topologies,
node.uniqueTimestamp(Ballot::fromValues), txnId, txn, route, executeAt,
reportTo, callback);
+
+ Ballot ballot = node.uniqueTimestamp(Ballot::fromValues);
+ Tracing tracing = node.agent().trace(txnId, TraceEventType.RECOVER);
+ Recover.recover(node, topologies, ballot, txnId, txn, route,
executeAt, reportTo, callback, tracing);
}
AsyncResult<InferredFastPath> awaitEarlier(Node node, Deps waitOn,
BlockedUntil blockedUntil)
{
+ if (tracing != null)
+ tracing.trace(null, "Recover awaiting earlier decisions: " +
waitOn.txnIds());
+
long requireEpoch = waitOn.maxTxnId(txnId).epoch();
return node.withEpochAtLeast(requireEpoch, executor, () -> {
TxnId recoverId = this.txnId;
@@ -579,6 +639,9 @@ public class Recover implements Callback<RecoverReply>,
BiConsumer<Result, Throw
AsyncResult<InferredFastPath> awaitLater(Node node, Deps waitOn,
BlockedUntil blockedUntil, @Nullable Participants<?> selfCoordVotes)
{
+ if (tracing != null)
+ tracing.trace(null, "Recover awaiting later decisions or
recoveries: " + waitOn.txnIds());
+
if (waitOn.isEmpty())
return AsyncResults.success(InferredFastPath.Accept);
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 4ed56578..e802a9c4 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -21,6 +21,7 @@ package accord.coordinate;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
+import accord.api.Tracing;
import accord.local.CommandStores.LatentStoreSelector;
import accord.coordinate.ExecuteFlag.CoordinationFlags;
import accord.local.Node;
@@ -67,9 +68,9 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
final Status witnessedByInvalidation;
final LatentStoreSelector reportTo;
- private RecoverWithRoute(Node node, SequentialAsyncExecutor executor,
Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?>
route, Status witnessedByInvalidation, LatentStoreSelector reportTo,
BiConsumer<Outcome, Throwable> callback)
+ private RecoverWithRoute(Node node, SequentialAsyncExecutor executor,
Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?>
route, Status witnessedByInvalidation, LatentStoreSelector reportTo,
BiConsumer<Outcome, Throwable> callback, @Nullable Tracing tracing)
{
- super(node, executor, txnId, route, IncludeInfo.All,
node.uniqueTimestamp(Ballot::fromValues), invalidIf);
+ super(node, executor, txnId, route, IncludeInfo.All,
node.uniqueTimestamp(Ballot::fromValues), invalidIf, tracing);
this.reportTo = reportTo;
// if witnessedByInvalidation == AcceptedInvalidate then we cannot
assume its definition was known, and our comparison with the status is invalid
Invariants.require(witnessedByInvalidation !=
Status.AcceptedInvalidate);
@@ -80,14 +81,14 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
assert topologies.oldestEpoch() == topologies.currentEpoch() &&
topologies.currentEpoch() == txnId.epoch();
}
- public static RecoverWithRoute recover(Node node, SequentialAsyncExecutor
executor, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable
Status witnessedByInvalidation, LatentStoreSelector reportTo,
BiConsumer<Outcome, Throwable> callback)
+ public static RecoverWithRoute recover(Node node, SequentialAsyncExecutor
executor, TxnId txnId, Infer.InvalidIf invalidIf, FullRoute<?> route, @Nullable
Status witnessedByInvalidation, LatentStoreSelector reportTo,
BiConsumer<Outcome, Throwable> callback, @Nullable Tracing tracing)
{
- return recover(node, executor, node.topology().forEpoch(route,
txnId.epoch(), SHARE), txnId, invalidIf, route, witnessedByInvalidation,
reportTo, callback);
+ return recover(node, executor, node.topology().forEpoch(route,
txnId.epoch(), SHARE), txnId, invalidIf, route, witnessedByInvalidation,
reportTo, callback, tracing);
}
- private static RecoverWithRoute recover(Node node, SequentialAsyncExecutor
executor, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf,
FullRoute<?> route, @Nullable Status witnessedByInvalidation,
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback)
+ private static RecoverWithRoute recover(Node node, SequentialAsyncExecutor
executor, Topologies topologies, TxnId txnId, Infer.InvalidIf invalidIf,
FullRoute<?> route, @Nullable Status witnessedByInvalidation,
LatentStoreSelector reportTo, BiConsumer<Outcome, Throwable> callback,
@Nullable Tracing tracing)
{
- RecoverWithRoute recover = new RecoverWithRoute(node, executor,
topologies, txnId, invalidIf, route, witnessedByInvalidation, reportTo,
callback);
+ RecoverWithRoute recover = new RecoverWithRoute(node, executor,
topologies, txnId, invalidIf, route, witnessedByInvalidation, reportTo,
callback, tracing);
recover.start();
return recover;
}
@@ -131,12 +132,16 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
if (failure != null)
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute failed: " +
Tracing.format(failure));
callback.accept(null, failure);
return;
}
CheckStatusOkFull full = ((CheckStatusOkFull)
this.merged).finish(query, query, query, success.withQuorum,
previouslyKnownToBeInvalidIf);
Known known = full.knownFor(txnId, query, query);
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute merged: " + full);
// TODO (required): audit this logic, and centralise with e.g.
FetchData inferences
// TODO (expected): skip straight to ExecuteTxn if we have a Stable
reply from each shard
@@ -144,29 +149,44 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
default: throw new AssertionError();
case Unknown:
+ {
if (known.definition().isKnown())
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found
definition; invoking Recover.");
+
Txn txn = full.partialTxn.reconstitute(query);
- Recover.recover(node, txnId, txn, query, reportTo,
callback);
+ Recover.recover(node, txnId, txn, query, reportTo,
callback, tracing);
}
else if (!known.definition().isOrWasKnown())
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found no current
or erased transaction; invoking Invalidate.");
+
if (witnessedByInvalidation != null &&
witnessedByInvalidation.compareTo(Status.PreAccepted) > 0)
throw illegalState("We previously invalidated, finding
a status that should be recoverable");
+
Invalidate.invalidate(node, txnId, query,
witnessedByInvalidation != null, reportTo, callback);
}
else
{
- callback.accept(full.toProgressToken(), null);
+ ProgressToken progressToken = full.toProgressToken();
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found
insufficient information to Recover or Invalidate; calling back with %s.",
progressToken);
+ callback.accept(progressToken, null);
}
break;
-
+ }
case WasApply:
case Apply:
+ {
if (!known.isDefinitionKnown())
{
if (!known.isTruncated() && !known.isInvalidated())
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found
Apply/WasApply, but no definition, truncation or invalidation; must have raced
with Apply, reporting no progress in expectation next attempt is successful.");
+
// we must have raced with a successful apply, so
should simply abort
callback.accept(ProgressToken.NONE, null);
return;
@@ -183,6 +203,8 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
if (known.isInvalidated())
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute
found partially truncated Invalidate; committing to shards " + trySendTo);
Commit.Invalidate.commitInvalidate(node,
txnId, trySendTo, txnId);
}
else
@@ -193,6 +215,9 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
if (!known.is(DepsKnown))
{
+ if (tracing != null)
+ tracing.trace(null,
"RecoverWithRoute found partially truncated Apply with incomplete Deps;
advancing state machine for shards " + trySendTo);
+
Invariants.require(txnId.isSystemTxn()
|| full.partialTxn.covers(trySendTo));
Participants<?> haveStable =
full.map.knownFor(Known.DepsOnly, trySendTo);
Route<?> haveUnstable =
trySendTo.without(haveStable);
@@ -205,6 +230,9 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
}
else
{
+ if (tracing != null)
+ tracing.trace(null,
"RecoverWithRoute found partially truncated Apply; persisting to shards " +
trySendTo);
+
Invariants.require(full.stableDeps.covers(trySendTo));
Invariants.require(txnId.isSystemTxn()
|| full.partialTxn.covers(trySendTo));
node.coordinationAdapter(txnId,
Recovery).persist(node, executor, null, trySendTo, trySendTo, SLICE, query,
bumpBallot, CoordinationFlags.none(), txnId, full.partialTxn, full.executeAt,
full.stableDeps, full.writes, full.result, informDurableOnDone, null);
@@ -215,6 +243,9 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
}
else
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found
Apply truncated at all shards; advancing Durability to at least Majority");
+
propagate = full.merge(Majority);
}
}
@@ -223,7 +254,7 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
propagate = full;
}
- Propagate.propagate(node, txnId,
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query,
reportTo, null, propagate, (s, f) -> callback.accept(f == null ?
propagate.toProgressToken() : null, f), null);
+ Propagate.propagate(node, txnId,
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query,
reportTo, null, propagate, (s, f) -> callback.accept(f == null ?
propagate.toProgressToken() : null, f), tracing);
break;
}
@@ -234,6 +265,8 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
Route<?> missingDeps;
if (known.is(DepsKnown))
{
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found Apply
with DepsKnown; persisting to all shards.");
deps = full.stableDeps.reconstitute(query);
missingDeps = query.slice(0, 0);
}
@@ -241,6 +274,9 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
{
Participants<?> hasDeps =
full.map.knownFor(Known.DepsOnly, query);
missingDeps = query.without(hasDeps);
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found Apply,
with deps missing for %s; advancing state machine for these shards.",
missingDeps);
+
if (full.stableDeps == null)
{
Invariants.require(hasDeps.isEmpty());
@@ -263,21 +299,33 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
}
else
{
- Recover.recover(node, txnId, txn, query, callback);
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found %s;
invoking Recover", known);
+
+ Recover.recover(node, txnId, txn, query, callback,
tracing);
}
break;
-
+ }
case Abort:
+ {
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute Found Abort;
propagating locally.");
+
if (witnessedByInvalidation != null &&
witnessedByInvalidation.hasBeen(Status.PreCommitted))
throw illegalState("We previously invalidated, finding a
status that should be recoverable");
- Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? INVALIDATED : null, f), null);
+ Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? INVALIDATED : null, f), tracing);
break;
-
+ }
case Erased:
+ {
+ if (tracing != null)
+ tracing.trace(null, "RecoverWithRoute found Erased;
propagating locally.");
+
// we should only be able to hit the Erased case if every
participating shard has advanced past this TxnId, so we don't need to recover it
- Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f), null);
+ Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f),
tracing);
break;
+ }
}
}
}
diff --git a/accord-core/src/main/java/accord/impl/AbstractLoader.java
b/accord-core/src/main/java/accord/impl/AbstractLoader.java
index c0dba44d..864ccd90 100644
--- a/accord-core/src/main/java/accord/impl/AbstractLoader.java
+++ b/accord-core/src/main/java/accord/impl/AbstractLoader.java
@@ -50,8 +50,7 @@ public abstract class AbstractLoader implements Journal.Loader
if (command.txnId().is(Write))
{
CommandStore unsafeStore = safeStore.commandStore();
- Command.Executed executed = command.asExecuted();
- Participants<?> executes =
executed.participants().stillExecutes();
+ Participants<?> executes =
command.participants().stillExecutes();
if (!executes.isEmpty())
{
command.writes()
diff --git a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
index 0157b3f8..322ed8f9 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/HomeState.java
@@ -21,6 +21,7 @@ package accord.impl.progresslog;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import accord.api.Tracing;
import accord.coordinate.MaybeRecover;
import accord.coordinate.Outcome;
import accord.local.Command;
@@ -33,6 +34,7 @@ import accord.primitives.TxnId;
import accord.utils.Invariants;
import static accord.api.ProgressLog.BlockedUntil.CanCoordinateExecution;
+import static accord.api.TraceEventType.HOME_PROGRESS;
import static accord.impl.progresslog.CallbackInvoker.invokeHomeCallback;
import static accord.impl.progresslog.CoordinatePhase.Done;
import static accord.impl.progresslog.CoordinatePhase.ReadyToExecute;
@@ -135,6 +137,7 @@ abstract class HomeState extends WaitingState
final void runHome(DefaultProgressLog instance, SafeCommandStore
safeStore, SafeCommand safeCommand)
{
+ Tracing tracing = instance.node.agent().trace(safeCommand.txnId(),
HOME_PROGRESS);
Invariants.require(!isHomeDoneOrUninitialised());
Command command = safeCommand.current();
// note: we may truncate locally based on shard-specific criteria, but
this doesn't mean we're globally persisted
@@ -145,28 +148,41 @@ abstract class HomeState extends WaitingState
Invariants.require(!command.durability().isDurableOrInvalidated(),
"Command is durable or invalidated, but we have not cleared the ProgressLog");
ProgressToken maxProgressToken =
instance.savedProgressToken(txnId).merge(command);
-
CallbackInvoker<ProgressToken, Outcome> invoker =
invokeHomeCallback(instance, txnId, maxProgressToken,
HomeState::recoverCallback);
-
CommandStores.StoreSelector reportTo = new
IncludingSpecificStoreSelector(safeStore.commandStore().id());
+
+ if (tracing != null)
+ tracing.trace(safeStore.commandStore(), "Invoking MaybeRecover
with progress token %s", maxProgressToken);
+
instance.debugActive(MaybeRecover.maybeRecover(instance.node(), txnId,
invalidIf(), command.route(), maxProgressToken, reportTo, invoker), invoker);
set(safeStore, instance, ReadyToExecute, Querying);
}
static void recoverCallback(SafeCommandStore safeStore, SafeCommand
safeCommand, DefaultProgressLog instance, TxnId txnId, @Nullable ProgressToken
prevProgressToken, Outcome success, Throwable fail)
{
+ Tracing tracing = instance.node.agent().trace(safeCommand.txnId(),
HOME_PROGRESS);
HomeState state = instance.get(txnId);
if (state == null)
+ {
+ if (tracing != null)
+ tracing.trace(safeStore.commandStore(), "No HomeState to
process recovery callback");
return;
+ }
Command command = safeCommand.current();
-
CoordinatePhase status = state.phase();
if (status.isAtMostReadyToExecute() && state.homeProgress() ==
Querying)
{
if (fail != null)
{
+ if (tracing != null)
+ {
+ tracing.trace(safeStore.commandStore(), "Failed to
recover: " + Tracing.format(fail));
+ tracing.trace(safeStore.commandStore(), "Waiting to retry
(%d) with progress token %s", state.homeRetryCounter(), prevProgressToken);
+ }
+
safeStore.agent().onCaughtException(fail, "Failed recovering "
+ state);
+
// re-save prior progress token
if (prevProgressToken != null &&
prevProgressToken.compareTo(command) > 0)
instance.saveProgressToken(command.txnId(),
prevProgressToken);
@@ -181,16 +197,28 @@ abstract class HomeState extends WaitingState
if (token.durability.isDurableOrInvalidated())
{
+ if (tracing != null)
+ tracing.trace(safeStore.commandStore(), "Callback:
progress token %s reports durable; marking home state done.", token);
state.setHomeDoneAndMaybeRemove(instance);
}
else
{
+ if (tracing != null)
+ tracing.trace(safeStore.commandStore(), "Callback:
progress token %s reports not durable; saving token and scheduling retry
(%d).", token, state.homeRetryCounter());
if (prevProgressToken != null && token.compareTo(command)
> 0)
instance.saveProgressToken(command.txnId(), token);
+ state.incrementHomeRetryCounter();
state.set(safeStore, instance, status, Queued);
}
}
}
+ else if (tracing != null)
+ {
+ if (status == Done)
+ tracing.trace(safeStore.commandStore(), "Callback: received,
but already done");
+ else
+ tracing.trace(safeStore.commandStore(), "Callback: received,
but not querying");
+ }
}
void setHomeDone(DefaultProgressLog instance)
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index dcb4335a..e047caa1 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -49,7 +49,7 @@ import accord.utils.UnhandledEnum;
import static accord.api.ProgressLog.BlockedUntil.CanApply;
import static accord.api.ProgressLog.BlockedUntil.Query.HOME;
import static accord.api.ProgressLog.BlockedUntil.Query.SHARD;
-import static accord.api.TraceEventType.PROGRESS;
+import static accord.api.TraceEventType.WAIT_PROGRESS;
import static accord.impl.progresslog.CallbackInvoker.invokeWaitingCallback;
import static accord.impl.progresslog.PackedKeyTracker.bitSet;
import static accord.impl.progresslog.PackedKeyTracker.clearRoundState;
@@ -380,7 +380,7 @@ abstract class WaitingState extends BaseTxnState
final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand,
DefaultProgressLog owner)
{
- runInternal(safeStore, safeCommand, owner,
owner.node.agent().trace(txnId, PROGRESS));
+ runInternal(safeStore, safeCommand, owner,
owner.node.agent().trace(txnId, WAIT_PROGRESS));
}
private void runInternal(SafeCommandStore safeStore, SafeCommand
safeCommand, DefaultProgressLog owner, @Nullable Tracing tracing)
@@ -495,7 +495,7 @@ abstract class WaitingState extends BaseTxnState
Command command = safeCommand.current();
Route<?> route = command.route();
- Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
+ Tracing tracing = owner.node.agent().trace(txnId, WAIT_PROGRESS);
if (fail == null)
{
@@ -666,7 +666,7 @@ abstract class WaitingState extends BaseTxnState
if ((callbackId & 1) != 1)
return;
- Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
+ Tracing tracing = owner.node.agent().trace(txnId, WAIT_PROGRESS);
BlockedUntil blockedUntil = blockedUntil();
if (callbackId == AWAITING_HOME_KEY_CALLBACKID)
{
diff --git a/accord-core/src/main/java/accord/local/MaxDecidedRX.java
b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
index 616df546..680db265 100644
--- a/accord-core/src/main/java/accord/local/MaxDecidedRX.java
+++ b/accord-core/src/main/java/accord/local/MaxDecidedRX.java
@@ -17,13 +17,25 @@
*/
package accord.local;
+import javax.annotation.Nullable;
+
import accord.api.RoutingKey;
+import accord.api.VisibleForImplementation;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable.Domain;
import accord.primitives.Routables;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
import accord.primitives.Unseekables;
-import accord.utils.BTreeReducingRangeMap;
+import accord.utils.Invariants;
+import accord.utils.ReducingRangeMap;
+
+import static accord.primitives.TxnId.maxIfNull;
+import static accord.primitives.TxnId.noneIfNull;
-public class MaxDecidedRX extends BTreeReducingRangeMap<TxnId>
+public class MaxDecidedRX extends ReducingRangeMap<TxnId>
{
public static final MaxDecidedRX EMPTY = new MaxDecidedRX();
@@ -32,25 +44,81 @@ public class MaxDecidedRX extends
BTreeReducingRangeMap<TxnId>
super();
}
- private MaxDecidedRX(boolean inclusiveEnds, Object[] tree)
+ private MaxDecidedRX(boolean inclusiveEnds, RoutingKey[] starts, TxnId[]
values)
+ {
+ super(inclusiveEnds, starts, values);
+ }
+
+ TxnId min(Routables<?> keysOrRanges)
+ {
+ return noneIfNull(foldlWithDefault(keysOrRanges, TxnId::nonNullOrMin,
null, TxnId.NONE));
+ }
+
+ TxnId max(Routables<?> keysOrRanges)
+ {
+ return maxIfNull(foldlWithDefault(keysOrRanges, TxnId::nonNullOrMax,
null, TxnId.MAX));
+ }
+
+ TxnId min(Range range)
+ {
+ return noneIfNull(foldlWithDefault(Ranges.of(range),
TxnId::nonNullOrMin, null, TxnId.NONE));
+ }
+
+ TxnId max(Range range)
{
- super(inclusiveEnds, tree);
+ return maxIfNull(foldlWithDefault(Ranges.of(range),
TxnId::nonNullOrMax, null, TxnId.MAX));
}
- TxnId get(Routables<?> keysOrRanges)
+ public @Nullable TxnId minDecidedDependencyId(Unseekables<?> keysOrRanges,
TxnId txnId)
{
- return foldl(keysOrRanges, TxnId::max, TxnId.NONE);
+ Invariants.require(txnId.is(Txn.Kind.ExclusiveSyncPoint));
+ // first check max, as if this is later we don't know that we can
safely filter
+ TxnId maxDecidedId = max(keysOrRanges);
+ if (maxDecidedId.compareTo(txnId) < 0)
+ return min(keysOrRanges);
+ return null;
+ }
+
+ @VisibleForImplementation
+ public @Nullable TxnId minDecidedDependencyId(Unseekable keyOrRange, TxnId
txnId)
+ {
+ Invariants.require(txnId.is(Txn.Kind.ExclusiveSyncPoint));
+ if (keyOrRange.domain() == Domain.Key)
+ {
+ TxnId minMaxDecidedId = get((RoutingKey) keyOrRange);
+ if (minMaxDecidedId.compareTo(txnId) < 0)
+ return minMaxDecidedId;
+ }
+ else
+ {
+ // first check max, as if this is later we don't know that we can
safely filter
+ Range range = (Range) keyOrRange;
+ TxnId maxDecidedId = max(range);
+ if (maxDecidedId.compareTo(txnId) < 0)
+ return min(range);
+ }
+ return null;
+ }
+
+ @VisibleForImplementation
+ public static @Nullable TxnId minDecidedDependencyId(MaxDecidedRX
maxDecidedRX, Unseekables<?> keysOrRanges, TxnId txnId)
+ {
+ return maxDecidedRX == null ? null :
maxDecidedRX.minDecidedDependencyId(keysOrRanges, txnId);
+ }
+
+ public TxnId get(RoutingKey key)
+ {
+ return TxnId.noneIfNull(super.get(key));
}
public MaxDecidedRX update(Unseekables<?> keysOrRanges, TxnId maxId)
{
- // note: we use mergeMax to ensure we take the maximum epoch and hlc
independently from any conflict
- // this is particularly essential for propagating unique HLCs, so
that bootstrap recipients don't
- // begin serving reads too early
- return update(this, keysOrRanges, maxId, TxnId::max,
MaxDecidedRX::new, Builder::new);
+ if (keysOrRanges.isEmpty())
+ return this;
+ return merge(this, create(keysOrRanges, maxId, Builder::new),
TxnId::max, Builder::new);
}
- private static class Builder extends AbstractBoundariesBuilder<RoutingKey,
TxnId, MaxDecidedRX>
+ static class Builder extends AbstractBoundariesBuilder<RoutingKey, TxnId,
MaxDecidedRX>
{
protected Builder(boolean inclusiveEnds, int capacity)
{
@@ -58,9 +126,10 @@ public class MaxDecidedRX extends
BTreeReducingRangeMap<TxnId>
}
@Override
- protected MaxDecidedRX buildInternal(Object[] tree)
+ protected MaxDecidedRX buildInternal()
{
- return new MaxDecidedRX(inclusiveEnds, tree);
+ return new MaxDecidedRX(inclusiveEnds, starts.toArray(new
RoutingKey[0]), values.toArray(new TxnId[0]));
}
}
+
}
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index ee9f5c8b..792e4623 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -56,6 +56,7 @@ import accord.api.RoutingKey;
import accord.api.Scheduler;
import accord.api.Timeouts;
import accord.api.TopologySorter;
+import accord.api.Tracing;
import accord.coordinate.CoordinateEphemeralRead;
import accord.coordinate.CoordinateTransaction;
import accord.coordinate.CoordinationAdapter;
@@ -883,7 +884,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
}
}
- public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf
invalidIf, FullRoute<?> route, StoreSelector reportTo)
+ public AsyncResult<? extends Outcome> recover(TxnId txnId, InvalidIf
invalidIf, FullRoute<?> route, StoreSelector reportTo, @Nullable Tracing
tracing)
{
{
AsyncResult<? extends Outcome> result = coordinating.get(txnId);
@@ -894,7 +895,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
SequentialAsyncExecutor executor = someSequentialExecutor();
AsyncResult<Outcome> result = withEpochExact(txnId.epoch(), executor,
() -> {
RecoverFuture<Outcome> future = new RecoverFuture<>();
- RecoverWithRoute.recover(this, executor, txnId, invalidIf, route,
null, reportTo, future);
+ RecoverWithRoute.recover(this, executor, txnId, invalidIf, route,
null, reportTo, future, tracing);
return future;
}).beginAsResult();
coordinating.putIfAbsent(txnId, result);
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index 2a40ffaf..a616a021 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -150,7 +150,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
if (full.maxKnowledgeSaveStatus.status == NotDefined && full.invalidIf
== NotKnownToBeInvalid)
{
if (tracing != null)
- tracing.trace(null, "Found nothing for %s", txnId);
+ tracing.trace(null, "Found nothing");
callback.accept(new FetchResult(Nothing, queried.slice(0, 0)),
null);
return;
}
@@ -163,7 +163,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
Route<?> route = Invariants.nonNull(full.route);
if (tracing != null)
- tracing.trace(null, "Found %s for %s", full.map, txnId);
+ tracing.trace(null, "Found %s", full.map);
Timestamp committedExecuteAt = full.executeAtIfKnown();
Propagate propagate =
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 74d1289d..40ad6799 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -25,7 +25,6 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
diff --git a/accord-core/src/main/java/accord/primitives/KeyRoute.java
b/accord-core/src/main/java/accord/primitives/KeyRoute.java
index a1beeccf..6f2c08e2 100644
--- a/accord-core/src/main/java/accord/primitives/KeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/KeyRoute.java
@@ -18,8 +18,6 @@
package accord.primitives;
-import java.util.Objects;
-
import accord.utils.Invariants;
import accord.api.RoutingKey;
diff --git a/accord-core/src/main/java/accord/primitives/TxnId.java
b/accord-core/src/main/java/accord/primitives/TxnId.java
index 0f611592..7328be1d 100644
--- a/accord-core/src/main/java/accord/primitives/TxnId.java
+++ b/accord-core/src/main/java/accord/primitives/TxnId.java
@@ -502,6 +502,16 @@ public class TxnId extends Timestamp implements
PreLoadContext
return new TxnId(epochMsb(epoch), 0, Id.NONE);
}
+ public static TxnId noneIfNull(TxnId id)
+ {
+ return id == null ? NONE : id;
+ }
+
+ public static TxnId maxIfNull(TxnId id)
+ {
+ return id == null ? MAX : id;
+ }
+
private static final Pattern PARSE =
Pattern.compile("\\[(?<epoch>[0-9]+),(?<hlc>[0-9]+),(?<flags>[0-9]+)\\([KR][REWSXL]\\),(?<node>[0-9]+)]");
public static TxnId parse(String txnIdString)
{
diff --git a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
index 5729bbb6..159124fb 100644
--- a/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/BTreeReducingRangeMap.java
@@ -156,37 +156,6 @@ public class BTreeReducingRangeMap<V> extends
BTreeReducingIntervalMap<RoutingKe
return accumulator;
}
- public <V2, P1, P2> V2 foldl(RoutingKey start, RoutingKey end,
QuadFunction<V, V2, P1, P2, V2> fold, V2 accumulator, P1 p1, P2 p2)
- {
- if (isEmpty())
- return accumulator;
-
- int treeSize = BTree.size(tree);
- int startIdx = findIndex(start);
- int startPos = startIdx < 0 ? (-1 - startIdx) : startIdx;
- if (startIdx == treeSize - 1 || startPos == treeSize)
- return accumulator; // is last or out of bounds -> we are done
- if (startIdx < 0) startPos = Math.max(0, startPos - 1); // inclusive
-
- int endIdx = findIndex(end);
- int endPos = endIdx < 0 ? (-1 - endIdx) : endIdx;
- if (endPos == 0)
- return accumulator;
-
- endPos = Math.min(endPos - 1, treeSize - 2); // inclusive
-
- Iterator<Entry<RoutingKey,V>> iterator = BTree.iterator(tree,
startPos, endPos, BTree.Dir.ASC);
-
- while (iterator.hasNext())
- {
- Entry<RoutingKey, V> entry = iterator.next();
- if (entry.hasValue() && entry.value() != null)
- accumulator = fold.apply(entry.value(), accumulator, p1, p2);
- }
-
- return accumulator;
- }
-
public int findIndex(RoutableKey key)
{
return BTree.findIndex(tree, EntryComparator.instance(), key);
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java
b/accord-core/src/main/java/accord/utils/Invariants.java
index 7044892f..bce558ea 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -54,7 +54,7 @@ public class Invariants
private static boolean IS_PARANOID = PARANOIA_COMPUTE > 0 ||
PARANOIA_MEMORY > 0;
private static Consumer<RuntimeException> onUnexpected =
System.getProperty("accord.testing", "false").equals("true")
? fail -> { throw
fail; }
- : fail ->
logger.warn("Invariant failed", fail);
+ : fail ->
logger.error("Invariant failed", fail);
private static final boolean DEBUG = System.getProperty("accord.debug",
"false").equals("true");
public static boolean isParanoid()
@@ -135,87 +135,87 @@ public class Invariants
public static boolean expect(boolean condition)
{
if (!condition)
- onUnexpected.accept(illegalState());
+ onUnexpected.accept(createIllegalState(null));
return condition;
}
public static void expect(boolean condition, String msg)
{
if (!condition)
- onUnexpected.accept(illegalState(msg));
+ onUnexpected.accept(createIllegalState(msg));
}
public static void expect(boolean condition, String fmt, int p1)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1)));
}
public static void expect(boolean condition, String fmt, int p1, int p2)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2)));
}
public static void expect(boolean condition, String fmt, long p1)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1)));
}
public static void expect(boolean condition, String fmt, long p1, long p2)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2)));
}
public static void expect(boolean condition, String fmt, @Nullable Object
p1)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1)));
}
public static <P> void expect(boolean condition, String fmt, @Nullable P
p1, Function<? super P, ?> transformP)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt,
transformP.apply(p1))));
+ onUnexpected.accept(createIllegalState(format(fmt,
transformP.apply(p1))));
}
public static void expect(boolean condition, String fmt, @Nullable Object
p1, @Nullable Object p2)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2)));
}
public static <P> void expect(boolean condition, String fmt, @Nullable
Object p1, @Nullable P p2, Function<? super P, ?> transformP2)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1,
transformP2.apply(p2))));
+ onUnexpected.accept(createIllegalState(format(fmt, p1,
transformP2.apply(p2))));
}
public static void expect(boolean condition, String fmt, @Nullable Object
p1, @Nullable Object p2, @Nullable Object p3)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2, p3)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2, p3)));
}
public static void expect(boolean condition, String fmt, @Nullable Object
p1, @Nullable Object p2, long p3)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2, p3)));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2, p3)));
}
public static <P> void expect(boolean condition, String fmt, @Nullable
Object p1, @Nullable Object p2, @Nullable P p3, Function<? super P, Object>
transformP3)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, p1, p2,
transformP3.apply(p3))));
+ onUnexpected.accept(createIllegalState(format(fmt, p1, p2,
transformP3.apply(p3))));
}
public static void expect(boolean condition, String fmt, Object... args)
{
if (!condition)
- onUnexpected.accept(illegalState(format(fmt, args)));
+ onUnexpected.accept(createIllegalState(format(fmt, args)));
}
public static void require(boolean condition)
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java
b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 27256c4f..99c44ff9 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -100,6 +100,9 @@ public class ListRead implements Read
@Override
public AsyncChain<Data> readDirect(CommandStore unsafeStore, Seekable key,
Timestamp executeAt)
{
+ if (key == null)
+ return AsyncChains.success(new ListData());
+
ListStore s = (ListStore)unsafeStore.unsafeGetDataStore();
logger.trace("submitting READ on {} at {} key:{}", unsafeStore.node(),
executeAt, key);
return executor.apply(unsafeStore).build(() -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]