This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71d235d5 Improve:
71d235d5 is described below
commit 71d235d56cb315fa5ae01ec24d3d9f08dd08ac6a
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Jul 1 14:24:01 2025 +0100
Improve:
- Journal debugging vtable support
- Background task tracing support
Fix:
- HLC_BOUND only valid for strictly lower HLC
- HAS_UNIQUE_HLC can only be safely computed if READY_TO_EXECUTE
- Break recursion in CommandStore.ensureReadyToCoordinate
- Fix find intersecting shard scheduler
- Separate adhoc ShardScheduler from normal to avoid overwriting
- Should still use execution listeners to detect invalid reads for certain
transactions even with dataStoreDetectsFutureReads
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20746
---
accord-core/src/main/java/accord/api/Agent.java | 4 +
accord-core/src/main/java/accord/api/Journal.java | 4 +
.../src/main/java/accord/api/TraceEventType.java | 23 ++++++
accord-core/src/main/java/accord/api/Tracing.java | 35 +++++++++
.../main/java/accord/coordinate/CheckShards.java | 21 ++++-
.../accord/coordinate/CoordinateEphemeralRead.java | 3 +-
.../main/java/accord/coordinate/ExecuteFlag.java | 5 +-
.../java/accord/coordinate/ExecuteSyncPoint.java | 1 -
.../src/main/java/accord/coordinate/FetchData.java | 12 ++-
.../src/main/java/accord/coordinate/Propose.java | 1 -
.../java/accord/coordinate/RecoverWithRoute.java | 6 +-
.../src/main/java/accord/impl/CommandChange.java | 30 +++++++
.../java/accord/impl/DefaultLocalListeners.java | 3 -
.../java/accord/impl/progresslog/WaitingState.java | 91 ++++++++++++++++++----
.../src/main/java/accord/local/CommandStore.java | 7 +-
.../src/main/java/accord/local/DepsCalculator.java | 7 +-
.../src/main/java/accord/local/DurableBefore.java | 1 -
.../main/java/accord/local/cfk/CommandsForKey.java | 8 +-
.../accord/local/durability/DurabilityQueue.java | 1 -
.../accord/local/durability/ShardDurability.java | 88 +++++++++++++++------
.../java/accord/messages/GetEphemeralReadDeps.java | 1 -
.../src/main/java/accord/messages/Propagate.java | 73 +++++++++++++++--
.../src/main/java/accord/messages/ReadData.java | 20 +++--
.../src/test/java/accord/impl/list/ListAgent.java | 13 ++++
24 files changed, 374 insertions(+), 84 deletions(-)
diff --git a/accord-core/src/main/java/accord/api/Agent.java
b/accord-core/src/main/java/accord/api/Agent.java
index f9c3cf2c..89016626 100644
--- a/accord-core/src/main/java/accord/api/Agent.java
+++ b/accord-core/src/main/java/accord/api/Agent.java
@@ -20,6 +20,8 @@ package accord.api;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
import accord.api.ProgressLog.BlockedUntil;
import accord.local.Command;
import accord.local.Node;
@@ -43,6 +45,8 @@ import accord.utils.async.AsyncChain;
*/
public interface Agent extends UncaughtExceptionListener
{
+ default @Nullable Tracing trace(TxnId txnId, TraceEventType eventType) {
return null; }
+
/**
* For use by implementations to decide what to do about successfully
recovered transactions.
* Specifically intended to define if and how they should inform clients
of the result.
diff --git a/accord-core/src/main/java/accord/api/Journal.java
b/accord-core/src/main/java/accord/api/Journal.java
index f452e526..99806e2d 100644
--- a/accord-core/src/main/java/accord/api/Journal.java
+++ b/accord-core/src/main/java/accord/api/Journal.java
@@ -19,11 +19,14 @@
package accord.api;
import java.util.Iterator;
+import java.util.List;
import java.util.NavigableMap;
import java.util.Objects;
+import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import accord.impl.CommandChange;
import accord.local.Command;
import accord.local.CommandStores;
import accord.local.DurableBefore;
@@ -47,6 +50,7 @@ public interface Journal
void start(Node node);
Command loadCommand(int store, TxnId txnId, RedundantBefore
redundantBefore, DurableBefore durableBefore);
+ default List<? extends Supplier<CommandChange.Builder>> debugCommand(int
store, TxnId txnId) { throw new UnsupportedOperationException(); }
Command.Minimal loadMinimal(int store, TxnId txnId, Load load,
RedundantBefore redundantBefore, DurableBefore durableBefore);
// TODO (required): propagate exceptions (i.e. using OnDone instead of
Runnable)
diff --git a/accord-core/src/main/java/accord/api/TraceEventType.java
b/accord-core/src/main/java/accord/api/TraceEventType.java
new file mode 100644
index 00000000..c47453db
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/TraceEventType.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software ation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.api;
+
+public enum TraceEventType
+{
+ FETCH, PROGRESS
+}
diff --git a/accord-core/src/main/java/accord/api/Tracing.java
b/accord-core/src/main/java/accord/api/Tracing.java
new file mode 100644
index 00000000..06d01120
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Tracing.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software ation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package accord.api;
+
+import java.util.Arrays;
+
+import accord.local.CommandStore;
+
+public interface Tracing
+{
+ void trace(CommandStore store, String message);
+
+ default void trace(CommandStore store, String fmt, Object ... args)
+ {
+ String message;
+ try { message = String.format(fmt, args); }
+ catch (Throwable t) { message = "Could not format \"" + fmt + "\" with
" + Arrays.toString(args) + " (" + t.getLocalizedMessage() + ")"; }
+ trace(store, message);
+ }
+}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java
b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index cb503072..91d743c9 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -20,6 +20,7 @@ package accord.coordinate;
import javax.annotation.Nullable;
+import accord.api.Tracing;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.SequentialAsyncExecutor;
@@ -50,6 +51,7 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
final IncludeInfo includeInfo;
final @Nullable Ballot bumpBallot;
final Infer.InvalidIf previouslyKnownToBeInvalidIf;
+ final @Nullable Tracing tracing;
protected CheckStatusOk merged;
protected boolean truncated;
@@ -62,6 +64,11 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
}
protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId
txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot
bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf)
+ {
+ this(node, executor, txnId, query, srcEpoch, includeInfo, bumpBallot,
previouslyKnownToBeInvalidIf, null);
+ }
+
+ protected CheckShards(Node node, SequentialAsyncExecutor executor, TxnId
txnId, U query, long srcEpoch, IncludeInfo includeInfo, @Nullable Ballot
bumpBallot, Infer.InvalidIf previouslyKnownToBeInvalidIf, @Nullable Tracing
tracing)
{
super(node, executor, topologyFor(node, txnId, query, srcEpoch),
txnId);
this.sourceEpoch = srcEpoch;
@@ -69,6 +76,7 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
this.includeInfo = includeInfo;
this.bumpBallot = bumpBallot;
this.previouslyKnownToBeInvalidIf = previouslyKnownToBeInvalidIf;
+ this.tracing = tracing;
}
private static Topologies topologyFor(Node node, TxnId txnId,
Unseekables<?> contact, long epoch)
@@ -81,6 +89,8 @@ public abstract class CheckShards<U extends Participants<?>>
extends ReadCoordin
protected void contact(Id id)
{
Participants<?> unseekables =
query.slice(topologies().computeRangesForNode(id));
+ if (tracing != null)
+ tracing.trace(null, "%s contacting %s for %s",
getClass().getSimpleName(), id, unseekables);
node.send(id, new CheckStatus(txnId, unseekables, sourceEpoch,
includeInfo, bumpBallot), executor, this);
}
@@ -89,10 +99,10 @@ public abstract class CheckShards<U extends
Participants<?>> extends ReadCoordin
protected Action checkSufficient(Id from, CheckStatusOk ok)
{
- if (isSufficient(from, ok))
- return Action.Approve;
-
- return Action.ApproveIfQuorum;
+ Action action = isSufficient(from, ok) ? Action.Approve :
Action.ApproveIfQuorum;
+ if (tracing != null)
+ tracing.trace(null, "%s %s reply %s from %s",
getClass().getSimpleName(), action, ok, from);
+ return action;
}
@Override
@@ -107,6 +117,9 @@ public abstract class CheckShards<U extends
Participants<?>> extends ReadCoordin
}
else
{
+ if (tracing != null)
+ tracing.trace(null, "%s received failure reply %s from %s",
getClass().getSimpleName(), reply, from);
+
switch ((CheckStatus.CheckStatusNack)reply)
{
default: throw new AssertionError(String.format("Unexpected
status: %s", reply));
diff --git
a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
index beab3314..b38fcaf1 100644
--- a/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
+++ b/accord-core/src/main/java/accord/coordinate/CoordinateEphemeralRead.java
@@ -23,6 +23,7 @@ import java.util.function.BiConsumer;
import accord.api.Result;
import accord.coordinate.ExecuteFlag.CoordinationFlags;
+import accord.coordinate.ExecuteFlag.ExecuteFlags;
import accord.coordinate.tracking.QuorumTracker;
import accord.local.Node;
import accord.local.SequentialAsyncExecutor;
@@ -166,7 +167,7 @@ public class CoordinateEphemeralRead extends
AbstractCoordinatePreAccept<Result,
Deps deps = Deps.merge(oks, oks.domainSize(), SortedListMap::getValue,
ok -> ok.deps);
topologies = node.topology().reselect(topologies,
QuorumEpochIntersections.preaccept.include, route, executeAtEpoch,
executeAtEpoch, SHARE, Owned);
CoordinationFlags flags = oks.foldlNonNull((d, k, v, out) -> {
- ExecuteFlag.ExecuteFlags.collect(out, k, v.flags, d, v.deps);
+ ExecuteFlags.collect(out, k, v.flags, d, v.deps);
return out;
}, deps, empty(oks.domain()));
new ExecuteEphemeralRead(node, executor, topologies, route,
txnId.withEpoch(executeAtEpoch), txn, deps, flags, callback).start();
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
index 1f6b750a..e81ab6ba 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteFlag.java
@@ -58,8 +58,9 @@ public enum ExecuteFlag
public static void collect(CoordinationFlags into, Node.Id id,
ExecuteFlags add, Object expectIfReadyToExecute, Object actualReadyToExecute)
{
- if (add.contains(READY_TO_EXECUTE) &&
!expectIfReadyToExecute.equals(actualReadyToExecute))
- add = add.without(READY_TO_EXECUTE);
+ // TODO (expected): this is overly restrictive, disabling the
optimisation in multi shard cases; should only expect the parts the shard owns
to be equal
+ if (add != none() &&
!expectIfReadyToExecute.equals(actualReadyToExecute))
+ add = none(); // HAS_UNIQUE_HLC only accurate if
READY_TO_EXECUTE was also accurate
into.add(id, add);
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
index 142573b5..ce0d64fa 100644
--- a/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
+++ b/accord-core/src/main/java/accord/coordinate/ExecuteSyncPoint.java
@@ -49,7 +49,6 @@ import accord.utils.SortedListMap;
import accord.utils.UnhandledEnum;
import accord.utils.WrappableException;
import accord.utils.async.AsyncResult;
-import accord.utils.async.AsyncResults;
import accord.utils.async.AsyncResults.SettableResult;
import static
accord.coordinate.CoordinationAdapter.Adapters.exclusiveSyncPoint;
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java
b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 6af0111e..de2762bb 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -20,6 +20,7 @@ package accord.coordinate;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
+import accord.api.Tracing;
import accord.coordinate.Infer.InvalidIf;
import accord.local.CommandStores.LatentStoreSelector;
import accord.local.CommandStores.StoreSelector;
@@ -39,6 +40,7 @@ import accord.utils.Invariants;
import javax.annotation.Nonnull;
+import static accord.api.TraceEventType.FETCH;
import static accord.coordinate.Infer.InvalidIf.NotKnownToBeInvalid;
/**
@@ -73,8 +75,9 @@ public class FetchData extends CheckShards<Route<?>>
final Participants<?> contactable;
final StoreSelector reportTo;
final BiConsumer<? super FetchResult, Throwable> callback;
+ final @Nullable Tracing tracing;
- public FetchRequest(SequentialAsyncExecutor executor, Known fetch,
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt,
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super
FetchResult, Throwable> callback)
+ public FetchRequest(SequentialAsyncExecutor executor, Known fetch,
TxnId txnId, InvalidIf invalidIf, @Nullable Timestamp executeAt,
Participants<?> contactable, StoreSelector reportTo, BiConsumer<? super
FetchResult, Throwable> callback, @Nullable Tracing tracing)
{
this.executor = executor;
this.fetch = fetch;
@@ -85,6 +88,7 @@ public class FetchData extends CheckShards<Route<?>>
this.srcEpoch = fetch.fetchEpoch(txnId, executeAt);
this.contactable = contactable;
this.reportTo = reportTo;
+ this.tracing = tracing;
}
}
@@ -101,7 +105,7 @@ public class FetchData extends CheckShards<Route<?>>
*/
public static void fetchSpecific(Known fetch, Node node, TxnId txnId,
InvalidIf invalidIf, @Nullable Timestamp executeAt, Route<?> query, Route<?>
maxRoute, StoreSelector reportTo, BiConsumer<? super FetchResult, Throwable>
callback)
{
- fetchSpecific(node, query, maxRoute, new
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt,
maxRoute, reportTo, callback));
+ fetchSpecific(node, query, maxRoute, new
FetchRequest(node.someSequentialExecutor(), fetch, txnId, invalidIf, executeAt,
maxRoute, reportTo, callback, node.agent().trace(txnId, FETCH)));
}
public static void fetchSpecific(Node node, Route<?> query, Route<?>
maxRoute, FetchRequest request)
@@ -188,6 +192,8 @@ public class FetchData extends CheckShards<Route<?>>
Invariants.require((success == null) != (failure == null));
if (failure != null)
{
+ if (tracing != null)
+ tracing.trace(null, "%s completed with failure %s",
getClass().getSimpleName(), failure);
callback.accept(null, failure);
}
else
@@ -196,7 +202,7 @@ public class FetchData extends CheckShards<Route<?>>
Invariants.require(isSufficient(merged), "Status %s is not
sufficient", merged);
// TODO (expected): should we automatically trigger a new fetch if
we find executeAt but did not request enough information? would be more robust
- Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query(), maxRoute, reportTo, target,
(CheckStatusOkFull) merged, callback);
+ Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query(), maxRoute, reportTo, target,
(CheckStatusOkFull) merged, callback, tracing);
}
}
}
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java
b/accord-core/src/main/java/accord/coordinate/Propose.java
index 26917132..c59d9839 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -44,7 +44,6 @@ import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.topology.Topologies;
-import accord.topology.TopologyManager;
import accord.utils.Invariants;
import accord.utils.SortedArrays;
import accord.utils.SortedListMap;
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index e4c2e63b..4ed56578 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -223,7 +223,7 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
propagate = full;
}
- Propagate.propagate(node, txnId,
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query,
reportTo, null, propagate, (s, f) -> callback.accept(f == null ?
propagate.toProgressToken() : null, f));
+ Propagate.propagate(node, txnId,
previouslyKnownToBeInvalidIf, sourceEpoch, success.withQuorum, query, query,
reportTo, null, propagate, (s, f) -> callback.accept(f == null ?
propagate.toProgressToken() : null, f), null);
break;
}
@@ -271,12 +271,12 @@ public class RecoverWithRoute extends
CheckShards<FullRoute<?>>
if (witnessedByInvalidation != null &&
witnessedByInvalidation.hasBeen(Status.PreCommitted))
throw illegalState("We previously invalidated, finding a
status that should be recoverable");
- Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? INVALIDATED : null, f));
+ Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? INVALIDATED : null, f), null);
break;
case Erased:
// we should only be able to hit the Erased case if every
participating shard has advanced past this TxnId, so we don't need to recover it
- Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f));
+ Propagate.propagate(node, txnId, previouslyKnownToBeInvalidIf,
sourceEpoch, success.withQuorum, query, query, reportTo, null, full, (s, f) ->
callback.accept(f == null ? TRUNCATED_DURABLE_OR_INVALIDATED : null, f), null);
break;
}
}
diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java
b/accord-core/src/main/java/accord/impl/CommandChange.java
index 216b0b46..a72ac99d 100644
--- a/accord-core/src/main/java/accord/impl/CommandChange.java
+++ b/accord-core/src/main/java/accord/impl/CommandChange.java
@@ -223,6 +223,36 @@ public class CommandChange
return durability;
}
+ public Timestamp executeAt()
+ {
+ return executeAt;
+ }
+
+ public Timestamp executesAtLeast()
+ {
+ return executesAtLeast;
+ }
+
+ public PartialTxn partialTxn()
+ {
+ return partialTxn;
+ }
+
+ public PartialDeps partialDeps()
+ {
+ return partialDeps;
+ }
+
+ public Writes writes()
+ {
+ return writes;
+ }
+
+ public Result result()
+ {
+ return result;
+ }
+
public StoreParticipants participants()
{
return participants;
diff --git a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
index 3b6a5694..97fca67a 100644
--- a/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
+++ b/accord-core/src/main/java/accord/impl/DefaultLocalListeners.java
@@ -32,7 +32,6 @@ import accord.local.Node;
import accord.local.PreLoadContext;
import accord.local.SafeCommand;
import accord.local.SafeCommandStore;
-import accord.local.StoreParticipants;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;
import accord.utils.AsymmetricComparator;
@@ -40,8 +39,6 @@ import accord.utils.Invariants;
import accord.utils.btree.BTree;
import accord.utils.btree.BTreeRemoval;
-import static accord.local.StoreParticipants.Filter.UPDATE;
-
// TODO (desired): evict to disk
public class DefaultLocalListeners implements LocalListeners
{
diff --git
a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
index 5af483a5..dcb4335a 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/WaitingState.java
@@ -24,6 +24,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.ProgressLog.BlockedUntil;
+import accord.api.Tracing;
import accord.coordinate.AsynchronousAwait;
import accord.coordinate.FetchData;
import accord.coordinate.FetchRoute;
@@ -48,6 +49,7 @@ import accord.utils.UnhandledEnum;
import static accord.api.ProgressLog.BlockedUntil.CanApply;
import static accord.api.ProgressLog.BlockedUntil.Query.HOME;
import static accord.api.ProgressLog.BlockedUntil.Query.SHARD;
+import static accord.api.TraceEventType.PROGRESS;
import static accord.impl.progresslog.CallbackInvoker.invokeWaitingCallback;
import static accord.impl.progresslog.PackedKeyTracker.bitSet;
import static accord.impl.progresslog.PackedKeyTracker.clearRoundState;
@@ -378,17 +380,16 @@ abstract class WaitingState extends BaseTxnState
final void runWaiting(SafeCommandStore safeStore, SafeCommand safeCommand,
DefaultProgressLog owner)
{
- runInternal(safeStore, safeCommand, owner);
+ runInternal(safeStore, safeCommand, owner,
owner.node.agent().trace(txnId, PROGRESS));
}
- private void runInternal(SafeCommandStore safeStore, SafeCommand
safeCommand, DefaultProgressLog owner)
+ private void runInternal(SafeCommandStore safeStore, SafeCommand
safeCommand, DefaultProgressLog owner, @Nullable Tracing tracing)
{
BlockedUntil blockedUntil = blockedUntil();
Command command = safeCommand.current();
Invariants.require(!owner.hasActive(Waiting, txnId));
Invariants.require(command.saveStatus().compareTo(blockedUntil.unblockedFrom) <
0,
"Command has met desired criteria (%s) but progress
log entry has not been cancelled: %s", blockedUntil.unblockedFrom, command);
-
set(safeStore, owner, blockedUntil, Querying);
TxnId txnId = safeCommand.txnId();
// first make sure we have enough information to obtain the command
locally
@@ -397,6 +398,8 @@ abstract class WaitingState extends BaseTxnState
if (!Route.isRoute(maxContactable))
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s. Fetching
route from %s", blockedUntil, maxContactable);
fetchRoute(owner, blockedUntil, txnId, maxContactable);
return;
}
@@ -405,6 +408,8 @@ abstract class WaitingState extends BaseTxnState
if (homeSatisfies().compareTo(blockedUntil) < 0)
{
// first wait until the homeKey has progressed to a point where it
can answer our query; we don't expect our shards to know until then anyway
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s. Waiting
for home key %s to satisfy.", blockedUntil, route.homeKey());
awaitHomeKey(owner, blockedUntil, txnId, executeAt, route);
return;
}
@@ -420,6 +425,8 @@ abstract class WaitingState extends BaseTxnState
{
// we know it has been decided one way or the other by the home
shard at least, so we attempt a fetch
// including the home shard to get us to at least PreCommitted
where we can safely wait on individual shards
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s; not
currently decided. Fetching from %s for epochs [%d..%d].", blockedUntil,
slicedRoute, lowEpoch, highEpoch);
fetch(owner, blockedUntil, txnId, executeAt, slicedRoute,
slicedRoute.withHomeKey(), route);
return;
}
@@ -432,6 +439,8 @@ abstract class WaitingState extends BaseTxnState
if (awaitRoute.isHomeKeyOnlyRoute())
{
// at this point we can switch to polling as we know someone has
the relevant state
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s. Fetching
%s%s for epochs [%d..%d].", blockedUntil, slicedRoute, slicedRoute ==
fetchRoute ? "" : " from " + fetchRoute, lowEpoch, highEpoch);
fetch(owner, blockedUntil, txnId, executeAt, slicedRoute,
fetchRoute, route);
return;
}
@@ -457,6 +466,9 @@ abstract class WaitingState extends BaseTxnState
int roundStart = roundIndex * roundSize;
if (roundStart >= awaitRoute.size())
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s. Fetching
%s%s for epochs [%d..%d].", blockedUntil, slicedRoute, slicedRoute ==
fetchRoute ? "" : " from " + fetchRoute, lowEpoch, highEpoch);
+
// all of the shards we are awaiting have been processed and found
at least one replica that has the state needed to answer our query
// at this point we can switch to polling as we know someone has
the relevant state
fetch(owner, blockedUntil, txnId, executeAt, slicedRoute,
fetchRoute, route);
@@ -467,6 +479,8 @@ abstract class WaitingState extends BaseTxnState
awaitRoute = awaitRoute.slice(roundStart, roundEnd);
// TODO (desired): use some mechanism (e.g. random chance or another
counter)
// to either periodically fetch the whole remaining route or
gradually increase the slice length
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s. Waiting for
%s to satisfy; round %d of %d.", blockedUntil, awaitRoute, roundIndex,
(awaitRoute.size() + (roundSize - 1))/roundSize);
awaitSlice(owner, blockedUntil, txnId, executeAt, awaitRoute,
(roundIndex << 1) | 1);
}
@@ -481,14 +495,17 @@ abstract class WaitingState extends BaseTxnState
Command command = safeCommand.current();
Route<?> route = command.route();
+ Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
if (fail == null)
{
if (route == null)
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Callback success, but
route not found.");
Invariants.require(kind == CallbackKind.FetchRoute);
Invariants.require(ready == null);
- state.retry(safeStore, safeCommand, owner, blockedUntil);
+ state.retry(safeStore, safeCommand, owner, blockedUntil,
tracing);
return;
}
@@ -518,14 +535,18 @@ abstract class WaitingState extends BaseTxnState
case AwaitHome:
if (ready.contains(route.homeKey()))
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Callback
success. Home key ready.");
// the home shard was found to already have the
necessary state, with no distributed await;
// we can immediately progress the state machine
Invariants.require(0 ==
state.awaitRoundIndex(roundSize));
Invariants.require(0 == state.awaitBitSet(roundSize));
- state.runInternal(safeStore, safeCommand, owner);
+ state.runInternal(safeStore, safeCommand, owner,
tracing);
}
else
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Callback
success. Home key not ready; waiting async.");
// the home shard is not ready to answer our query,
but we have registered our remote callback so can wait for it to contact us
state.set(safeStore, owner, blockedUntil, Awaiting);
}
@@ -545,7 +566,7 @@ abstract class WaitingState extends BaseTxnState
Invariants.expect((int)
awaitRoute.findNextSameKindIntersection(roundStart, (Unseekables) ready, 0) /
roundSize == roundIndex);
// TODO (desired): in this case perhaps upgrade to
fetch for next round?
state.updateAwaitRound(roundIndex + 1, roundSize);
- state.runInternal(safeStore, safeCommand, owner);
+ state.runInternal(safeStore, safeCommand, owner,
tracing);
}
else
{
@@ -561,7 +582,7 @@ abstract class WaitingState extends BaseTxnState
case FetchRoute:
if (state.homeSatisfies().compareTo(blockedUntil) < 0)
{
- state.runInternal(safeStore, safeCommand, owner);
+ state.runInternal(safeStore, safeCommand, owner,
tracing);
return;
}
@@ -582,7 +603,7 @@ abstract class WaitingState extends BaseTxnState
{
// we don't think we have anything to wait for, but we
have encountered some notReady responses; queue up a retry
state.setAwaitDone(roundSize);
- state.retry(safeStore, safeCommand, owner,
blockedUntil);
+ state.retry(safeStore, safeCommand, owner,
blockedUntil, tracing);
}
else
{
@@ -590,7 +611,7 @@ abstract class WaitingState extends BaseTxnState
roundIndex = nextIndex / roundSize;
state.updateAwaitRound(roundIndex, roundSize);
state.initialiseAwaitBitSet(awaitRoute, notReady,
roundIndex, roundSize);
- state.runInternal(safeStore, safeCommand, owner);
+ state.runInternal(safeStore, safeCommand, owner,
tracing);
}
}
}
@@ -598,7 +619,7 @@ abstract class WaitingState extends BaseTxnState
else
{
safeStore.agent().onCaughtException(fail, "Failed fetching data
for " + state);
- state.retry(safeStore, safeCommand, owner, blockedUntil);
+ state.retry(safeStore, safeCommand, owner, blockedUntil, tracing);
}
}
@@ -645,6 +666,7 @@ abstract class WaitingState extends BaseTxnState
if ((callbackId & 1) != 1)
return;
+ Tracing tracing = owner.node.agent().trace(txnId, PROGRESS);
BlockedUntil blockedUntil = blockedUntil();
if (callbackId == AWAITING_HOME_KEY_CALLBACKID)
{
@@ -655,19 +677,42 @@ abstract class WaitingState extends BaseTxnState
setHomeSatisfies(newHomeStatus);
if (waitingProgress() != Awaiting)
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received async home key
callback %d but no longer Awaiting", callbackId);
return;
+ }
if (newHomeStatus.compareTo(blockedUntil) < 0 ||
currentHomeStatus.compareTo(blockedUntil) >= 0)
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received redundant
async home key callback %d. Blocked until %s, home key now %s (previously %s)",
callbackId, blockedUntil, newHomeStatus, currentHomeStatus);
return;
+ }
SafeCommand safeCommand = safeStore.unsafeGet(txnId);
if (safeCommand != null)
- runInternal(safeStore, safeCommand, owner);
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received async home key
callback %d. Blocked until %s, home key now %s.", callbackId, blockedUntil,
newHomeStatus);
+ runInternal(safeStore, safeCommand, owner, tracing);
+ }
}
else
{
if (waitingProgress() != Awaiting)
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received async callback
%d but no longer Awaiting", callbackId);
return;
+ }
+
+ if (newStatus.compareTo(blockedUntil.unblockedFrom) < 0)
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received async callback
%d with %s, insufficient for %s", callbackId, newStatus, blockedUntil);
+ return;
+ }
callbackId >>= 1;
SafeCommand safeCommand =
Invariants.nonNull(safeStore.unsafeGet(txnId));
@@ -680,28 +725,46 @@ abstract class WaitingState extends BaseTxnState
int roundIndex = awaitRoundIndex(roundSize);
int updateBitSet = roundCallbackBitSet(owner, txnId, from,
slicedRoute, callbackId, roundIndex, roundSize);
if (updateBitSet == 0)
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Received async callback
%d for already ready keys.", callbackId, blockedUntil);
return;
+ }
int bitSet = awaitBitSet(roundSize);
bitSet &= ~updateBitSet;
setAwaitBitSet(bitSet, roundSize);
if (bitSet == 0)
- runInternal(safeStore, safeCommand, owner);
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s.
Received async callback %d for waiting keys. Round complete.", blockedUntil,
callbackId);
+
+ runInternal(safeStore, safeCommand, owner, tracing);
+ }
+ else
+ {
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Blocked until %s.
Received async callback %d for waiting keys. %d keys still waiting this
round.", blockedUntil, callbackId, Integer.bitCount(bitSet));
+ }
}
}
// TODO (expected): use back-off counter here
- private void retry(SafeCommandStore safeStore, SafeCommand safeCommand,
DefaultProgressLog owner, BlockedUntil blockedUntil)
+ private void retry(SafeCommandStore safeStore, SafeCommand safeCommand,
DefaultProgressLog owner, BlockedUntil blockedUntil, @Nullable Tracing tracing)
{
if (!contactEveryone())
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Retrying immediately,
without contact restrictions");
setContactEveryone(true);
// try again immediately with a query to all eligible replicas
- runInternal(safeStore, safeCommand, owner);
+ runInternal(safeStore, safeCommand, owner, tracing);
}
else
{
+ if (tracing != null)
+ tracing.trace(owner.commandStore, "Retry queued for later.");
// queue a retry
set(safeStore, owner, blockedUntil, Queued);
}
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index fb4fb05a..2aca538c 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -582,12 +582,11 @@ public abstract class CommandStore implements
SequentialAsyncExecutor
.begin((success, fail) -> {
if (fail != null)
{
- Ranges remaining =
redundantBefore.removeWitnessed(minForEpoch, ranges);
- remaining = redundantBefore.removeRetired(remaining);
+ Ranges remaining =
redundantBefore.removeRetired(redundantBefore.removeWitnessed(minForEpoch,
ranges));
if (!remaining.isEmpty())
{
- logger.error("Failed to close epoch {} for ranges {}
on store {}. Retrying.", epoch, remaining, id);
- ensureReadyToCoordinate(epoch, remaining);
+ logger.error("Failed to close epoch {} for ranges {}
on store {}. Retrying.", epoch, remaining, id, fail);
+ node.someExecutor().execute(() ->
ensureReadyToCoordinate(epoch, remaining));
}
}
});
diff --git a/accord-core/src/main/java/accord/local/DepsCalculator.java
b/accord-core/src/main/java/accord/local/DepsCalculator.java
index fc609cc4..c67b4459 100644
--- a/accord-core/src/main/java/accord/local/DepsCalculator.java
+++ b/accord-core/src/main/java/accord/local/DepsCalculator.java
@@ -65,9 +65,12 @@ public class DepsCalculator extends Deps.Builder implements
CommandSummaries.Act
{
ExecuteFlags flags = ExecuteFlags.none();
if (!hasUnappliedDependency)
+ {
flags = flags.with(READY_TO_EXECUTE);
- if (maxAppliedHlc < txnId.hlc())
- flags = flags.with(HAS_UNIQUE_HLC);
+ // we don't know whether hlc is unique unless dependencies have
applied
+ if (maxAppliedHlc < txnId.hlc())
+ flags = flags.with(HAS_UNIQUE_HLC);
+ }
return flags;
}
diff --git a/accord-core/src/main/java/accord/local/DurableBefore.java
b/accord-core/src/main/java/accord/local/DurableBefore.java
index 7709d81a..f3830ccd 100644
--- a/accord-core/src/main/java/accord/local/DurableBefore.java
+++ b/accord-core/src/main/java/accord/local/DurableBefore.java
@@ -23,7 +23,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.RoutingKey;
-import accord.local.durability.ShardDurability;
import accord.primitives.AbstractRanges;
import accord.primitives.Participants;
import accord.primitives.Ranges;
diff --git a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
index 5963902b..58036157 100644
--- a/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/cfk/CommandsForKey.java
@@ -1789,7 +1789,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
break;
}
if (maxAppliedWriteByExecuteAt >= 0) maxUniqueHlc =
Math.max(maxUniqueHlc,
committedByExecuteAt[maxAppliedWriteByExecuteAt].executeAt.hlc());
- else maxUniqueHlc = Math.max(maxUniqueHlc, bounds.gcBefore.hlc());
+ else maxUniqueHlc = Math.max(maxUniqueHlc, bounds.gcBefore.hlc() - 1);
// only guaranteed to witness those with strictly lower hlc; might be some txn
agreed with higher flag bits
return updater.update(key, bounds, isNewBoundsInfo, byId,
minUndecidedById, maxAppliedPreBootstrapWriteById, committedByExecuteAt,
maxAppliedWriteByExecuteAt, maxUniqueHlc, loadingPruned, newPrunedBeforeById,
unmanageds);
}
@@ -2061,10 +2061,6 @@ public class CommandsForKey extends CommandsForKeyUpdate
int newPrunedBeforeById = prunedBeforeId(newById, prunedBefore(),
redundantBefore(newBounds));
Invariants.paranoid(newPrunedBeforeById < 0 ? prunedBeforeById < 0 ||
byId[prunedBeforeById].compareTo(newBounds.gcBefore) < 0 :
newById[newPrunedBeforeById].equals(byId[prunedBeforeById]));
- long maxUniqueHlc = this.maxUniqueHlc;
- if (maxUniqueHlc <= newBounds.gcBefore.hlc() &&
newBounds.gcBefore.is(HLC_BOUND))
- maxUniqueHlc = 0;
-
return notifyManagedPreBootstrap(this, newBounds,
reconstructAndUpdateUnmanaged(key, newBounds, true, newById, maxUniqueHlc,
newLoadingPruned, newPrunedBeforeById, unmanageds));
}
@@ -2368,7 +2364,7 @@ public class CommandsForKey extends CommandsForKeyUpdate
if (maxWrite != null && maxWrite.hlc() >= maxUniqueHlc)
return false;
TxnId gcBefore = redundantBefore();
- return gcBefore.hlc() < maxUniqueHlc || !gcBefore.is(HLC_BOUND);
+ return gcBefore.hlc() <= maxUniqueHlc || !gcBefore.is(HLC_BOUND);
}
public static boolean reportLinearizabilityViolations()
diff --git
a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
index 69948f1c..4e062cf0 100644
--- a/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
+++ b/accord-core/src/main/java/accord/local/durability/DurabilityQueue.java
@@ -51,7 +51,6 @@ import accord.primitives.SyncPoint;
import accord.primitives.TxnId;
import accord.topology.TopologyManager;
import accord.utils.Invariants;
-import accord.utils.async.AsyncResult;
import org.agrona.collections.ObjectHashSet;
import static accord.coordinate.ExecuteSyncPoint.coordinateIncluding;
diff --git
a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
index e9666e42..58d7641f 100644
--- a/accord-core/src/main/java/accord/local/durability/ShardDurability.java
+++ b/accord-core/src/main/java/accord/local/durability/ShardDurability.java
@@ -18,8 +18,10 @@
package accord.local.durability;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -41,6 +43,7 @@ import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Shard;
import accord.topology.Topology;
@@ -95,6 +98,7 @@ public class ShardDurability
// TODO (expected): support intra-shard parallelism
private class ShardScheduler
{
+ final boolean adhoc;
final long id;
Shard shard;
boolean ticks, stopping, stopped, readyToStop;
@@ -123,8 +127,9 @@ public class ShardDurability
DurabilityRequest activeRequest;
Waiting waiting;
- private ShardScheduler()
+ private ShardScheduler(boolean adhoc)
{
+ this.adhoc = adhoc;
this.id = nextShardId.incrementAndGet();
}
@@ -164,11 +169,8 @@ public class ShardDurability
void markDefunct()
{
- logger.info("Marking shard durability scheduler {} for {}
defunct", id, shard);
- }
-
- synchronized void markDefunctSilent()
- {
+ if (!adhoc)
+ logger.info("Marking shard durability scheduler {} for {}
defunct", id, shard);
stopping = true;
}
@@ -220,7 +222,8 @@ public class ShardDurability
node.scheduler().once(() -> {
synchronized (ShardDurability.this)
{
- shardSchedulers.remove(shard.range, this);
+ if (adhoc)
adhocShardSchedulers.remove(shard.range, this);
+ else shardSchedulers.remove(shard.range, this);
}
}, 0, MICROSECONDS);
}
@@ -429,9 +432,22 @@ public class ShardDurability
* duration.
*/
private long shardCycleTimeMicros = TimeUnit.SECONDS.toMicros(30);
- private long shardCycleTimeoutDelayMicros = TimeUnit.HOURS.toMicros(1);
private final NavigableMap<Range, ShardScheduler> shardSchedulers = new
TreeMap<>(Range::compare);
+
+ /**
+ * Adhoc schedulers for ranges we don't own but have been asked to
coordinate durability for.
+ *
+ * Nodes only maintain regular schedulers for the ranges they own, but
sometimes a node may
+ * be asked to coordinate durability for ranges it doesn't own, such as
during repair. Nothing
+ * requires these operations to be coordinated by an owning replica.
+ *
+ * When creating schedulers for non-owned ranges, we might accidentally
pick a range that we DO own and
+ * overwrite the existing background scheduler for that range, causing us
to stop running regular
+ * background actions once the adhoc operation completes. To avoid this
possibility, we separate adhoc schedulers
+ * from regular schedulers to prevent accidental overwriting.
+ */
+ private final NavigableMap<Range, ShardScheduler> adhocShardSchedulers =
new TreeMap<>(Range::compare);
private final ConcurrencyControl syncPointControl = new
ConcurrencyControl(8);
private final DurabilityQueue durabilityQueue;
private long latestEpoch;
@@ -466,11 +482,6 @@ public class ShardDurability
scheduled = node.scheduler().recurring(this::tick,
shardCycleTimeMicros / targetShardSplits, MICROSECONDS);
}
- public synchronized void setShardCycleTimeoutDelay(long
newShardCycleTimeoutDelay, TimeUnit units)
- {
- shardCycleTimeoutDelayMicros =
units.toMicros(newShardCycleTimeoutDelay);
- }
-
/**
* Schedule regular invocations of CoordinateShardDurable and
CoordinateGloballyDurable
*/
@@ -484,6 +495,8 @@ public class ShardDurability
{
for (ShardScheduler scheduler : shardSchedulers.values())
scheduler.tick();
+ for (ShardScheduler scheduler : adhocShardSchedulers.values())
+ scheduler.tick();
}
public synchronized void stop()
@@ -491,7 +504,10 @@ public class ShardDurability
stop = true;
for (ShardScheduler scheduler : shardSchedulers.values())
scheduler.markDefunct();
+ for (ShardScheduler scheduler : adhocShardSchedulers.values())
+ scheduler.markDefunct();
shardSchedulers.clear();
+ adhocShardSchedulers.clear();
}
public synchronized void request(DurabilityRequest request)
@@ -509,6 +525,10 @@ public class ShardDurability
{
{
Map.Entry<Range, ShardScheduler> e =
shardSchedulers.floorEntry(range);
+ // we look up ceiling entry as well, because our comparison on
both start and end
+ // means a match with the same start but higher end may be
considered to sort after
+ if (e == null || e.getKey().compareIntersecting(range) != 0)
+ e = shardSchedulers.ceilingEntry(range);
if (e != null)
{
Range shardRange = e.getKey();
@@ -531,16 +551,18 @@ public class ShardDurability
while (true)
{
// TODO (desired): seed the numberOfSplits based on our permanent
shardScheduler average?
- ShardScheduler shardScheduler = new ShardScheduler();
+ ShardScheduler scheduler = new ShardScheduler(true);
Topology topology = node.topology().current();
int i = topology.indexForKey(range.start());
if (i >= 0 && !range.startInclusive() &&
range.start().equals(topology.get(i).range.end())) ++i;
Invariants.require(i < 0 ||
topology.get(i).range.compareIntersecting(range) == 0);
Shard shard = i >= 0 ? topology.get(i) : Shard.create(range,
ofSorted(node.id()), ofSorted(node.id()));
- shardScheduler.update(shard, 0, false);
- shardScheduler.request(request, range.intersection(shard.range));
- shardScheduler.markDefunctSilent();
- shardSchedulers.put(shard.range, shardScheduler);
+ ShardScheduler existing =
adhocShardSchedulers.putIfAbsent(shard.range, scheduler);
+ if (existing != null) scheduler = existing;
+ else logger.info("Starting adhoc shard durability scheduler {} for
{}", scheduler.id, shard);
+ scheduler.update(shard, 0, false);
+ scheduler.request(request, range.intersection(shard.range));
+ scheduler.markDefunct();
if (shard.range.contains(range))
break;
range = range.newRange(shard.range.end(), range.end());
@@ -578,7 +600,7 @@ public class ShardDurability
ShardScheduler scheduler = prev.remove(shard.range);
if (scheduler == null)
{
- scheduler = new ShardScheduler();
+ scheduler = new ShardScheduler(false);
logger.info("Starting shard durability scheduler {} for {}",
scheduler.id, shard);
}
shardSchedulers.put(shard.range, scheduler);
@@ -591,15 +613,19 @@ public class ShardDurability
public synchronized ImmutableView immutableView()
{
- TreeMap<Range, ShardScheduler> schedulers = new
TreeMap<>(Range::compare);
- schedulers.putAll(shardSchedulers);
+ List<Map.Entry<Range, ShardScheduler>> schedulers = new
ArrayList<>(shardSchedulers.entrySet());
+ if (!adhocShardSchedulers.isEmpty())
+ {
+ schedulers.addAll(adhocShardSchedulers.entrySet());
+ schedulers.sort(Map.Entry.comparingByKey(Range::compare));
+ }
return new ImmutableView(schedulers);
}
public static class ImmutableView
{
- private final TreeMap<Range, ShardScheduler> schedulers;
- ImmutableView(TreeMap<Range, ShardScheduler> schedulers)
+ private final List<Map.Entry<Range, ShardScheduler>> schedulers;
+ ImmutableView(List<Map.Entry<Range, ShardScheduler>> schedulers)
{
this.schedulers = schedulers;
}
@@ -619,6 +645,8 @@ public class ShardDurability
long lastStartedAtMicros;
long cycleStartedAtMicros;
+ Timestamp min;
+ Object requestedBy;
int retries;
Ranges active;
@@ -627,7 +655,7 @@ public class ShardDurability
public boolean advance()
{
if (iterator == null)
- iterator = schedulers.entrySet().iterator();
+ iterator = schedulers.iterator();
if (!iterator.hasNext())
{
@@ -651,6 +679,8 @@ public class ShardDurability
this.currentSplits = scheduler.currentSplits;
this.lastStartedAtMicros = scheduler.lastStartedAtMicros;
this.cycleStartedAtMicros = scheduler.cycleStartedAtMicros;
+ this.min = scheduler.activeRequest != null ?
scheduler.activeRequest.min : null;
+ this.requestedBy = scheduler.activeRequest != null ?
scheduler.activeRequest.requestedBy : null;
this.retries = scheduler.retries;
this.active = scheduler.active;
this.waiting = scheduler.waiting;
@@ -720,6 +750,16 @@ public class ShardDurability
return cycleStartedAtMicros;
}
+ public Timestamp min()
+ {
+ return min;
+ }
+
+ public Object requestedBy()
+ {
+ return requestedBy;
+ }
+
public int retries()
{
return retries;
diff --git
a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
index 88043420..a0829543 100644
--- a/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetEphemeralReadDeps.java
@@ -29,7 +29,6 @@ import accord.local.SafeCommandStore;
import accord.local.StoreParticipants;
import accord.primitives.Deps;
import accord.primitives.FullRoute;
-import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
diff --git a/accord-core/src/main/java/accord/messages/Propagate.java
b/accord-core/src/main/java/accord/messages/Propagate.java
index a4fccc12..2a40ffaf 100644
--- a/accord-core/src/main/java/accord/messages/Propagate.java
+++ b/accord-core/src/main/java/accord/messages/Propagate.java
@@ -20,6 +20,7 @@ package accord.messages;
import accord.api.ProtocolModifiers;
import accord.api.Result;
import accord.api.RoutingKey;
+import accord.api.Tracing;
import accord.coordinate.FetchData.FetchResult;
import accord.coordinate.Infer.InvalidIf;
import accord.local.Cleanup;
@@ -92,7 +93,6 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
final Ballot acceptedOrCommitted;
final Status.Durability durability;
@Nullable final RoutingKey homeKey;
- // this is a WHOLE NODE measure, so if commit epoch has more ranges we do
not count as committed if we can only commit in coordination epoch
final KnownMap known;
final WithQuorum withQuorum;
@Nullable final PartialTxn partialTxn;
@@ -101,6 +101,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
@Nullable final Writes writes;
@Nullable final Result result;
final BiConsumer<? super FetchResult, Throwable> callback;
+ final @Nullable Tracing trace;
private transient volatile FetchResult fetchResult;
private static final AtomicReferenceFieldUpdater<Propagate, FetchResult>
fetchResultUpdater = AtomicReferenceFieldUpdater.newUpdater(Propagate.class,
FetchResult.class, "fetchResult");
@@ -120,7 +121,7 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
@Nullable Timestamp committedExecuteAt,
@Nullable Writes writes,
@Nullable Result result,
- BiConsumer<? super FetchResult, Throwable> callback)
+ BiConsumer<? super FetchResult, Throwable> callback, @Nullable Tracing
trace)
{
this.node = node;
this.txnId = txnId;
@@ -141,12 +142,15 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
this.writes = writes;
this.result = result;
this.callback = callback;
+ this.trace = trace;
}
- public static void propagate(Node node, TxnId txnId, InvalidIf
previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?>
queried, Participants<?> contactable, LatentStoreSelector reportTo, @Nullable
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult,
Throwable> callback)
+ public static void propagate(Node node, TxnId txnId, InvalidIf
previouslyKnownToBeInvalidIf, long sourceEpoch, WithQuorum withQuorum, Route<?>
queried, Participants<?> contactable, LatentStoreSelector reportTo, @Nullable
Known target, CheckStatusOkFull full, BiConsumer<? super FetchResult,
Throwable> callback, @Nullable Tracing tracing)
{
if (full.maxKnowledgeSaveStatus.status == NotDefined && full.invalidIf
== NotKnownToBeInvalid)
{
+ if (tracing != null)
+ tracing.trace(null, "Found nothing for %s", txnId);
callback.accept(new FetchResult(Nothing, queried.slice(0, 0)),
null);
return;
}
@@ -158,9 +162,12 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
full = full.finish(queried, contactable, queried.with((Unseekables)
contactable), withQuorum, previouslyKnownToBeInvalidIf);
Route<?> route = Invariants.nonNull(full.route);
+ if (tracing != null)
+ tracing.trace(null, "Found %s for %s", full.map, txnId);
+
Timestamp committedExecuteAt = full.executeAtIfKnown();
Propagate propagate =
- new Propagate(node, txnId, route, target, full.invalidIf,
full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised,
full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum,
full.partialTxn, full.stableDeps, committedExecuteAt, full.writes, full.result,
callback);
+ new Propagate(node, txnId, route, target, full.invalidIf,
full.maxKnowledgeSaveStatus, full.maxSaveStatus, full.maxPromised,
full.acceptedOrCommitted, full.durability, full.homeKey, full.map, withQuorum,
full.partialTxn, full.stableDeps, committedExecuteAt, full.writes, full.result,
callback, tracing);
long untilEpoch = txnId.epoch();
if (committedExecuteAt != null)
@@ -185,7 +192,11 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
// TODO (expected): can we come up with a better more universal
pattern for avoiding updating a command we don't intersect with?
// ideally integrated with safeStore.get()
if (participants.owns().isEmpty() && safeStore.ifInitialised(txnId) ==
null)
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Uninitialised and not
owned; skipping");
return null;
+ }
SafeCommand safeCommand = safeStore.get(txnId, participants);
Command command = safeCommand.current();
@@ -200,7 +211,10 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
switch (command.saveStatus().phase)
{
// Already know the outcome, waiting on durability so maybe update
with new durability information which can also trigger cleanup
- case Persist: return updateDurability(safeStore, safeCommand,
participants);
+ case Persist:
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Already persisted;
skipping");
+ return updateDurability(safeStore, safeCommand, participants);
case Cleanup:
case Invalidate:
return null;
@@ -265,6 +279,8 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
SaveStatus propagate =
found.atLeast(currentlyKnown).propagatesSaveStatus();
if (propagate.known.isSatisfiedBy(currentlyKnown))
{
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Already know at least
as much as peer responses.");
updateFetchResult(found, participants.owns());
return updateDurability(safeStore, safeCommand, participants);
}
@@ -281,23 +297,31 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
throw illegalState("Invalid states to propagate: " +
propagate);
case Invalidated:
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Invalidating");
Commands.commitInvalidate(safeStore, safeCommand, route);
break;
case Applied:
case PreApplied:
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Applying");
Invariants.require(committedExecuteAt != null);
// we must use the remote executeAt, as it might have a
uniqueHlc we aren't aware of at commit
confirm(Commands.apply(safeStore, safeCommand, participants,
Ballot.ZERO, txnId, route, committedExecuteAt, stableDeps, partialTxn, writes,
result));
break;
case Stable:
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Committing as
stable");
confirm(Commands.commit(safeStore, safeCommand, participants,
Stable, acceptedOrCommitted, txnId, route, partialTxn, executeAtIfKnown,
stableDeps, null));
break;
case Committed:
// TODO (expected): we can propagate Committed as Stable if we
have any other Stable result AND a quorum of committedDeps
case PreCommitted:
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Pre-committing");
confirm(Commands.precommit(safeStore, safeCommand,
participants, txnId, executeAtIfKnown, promised));
// TODO (desired): would it be clearer to yield a SaveStatus
so we can have PreCommittedWithDefinition
if (!found.definition().isKnown())
@@ -306,11 +330,19 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
case PreAccepted:
// only preaccept if we coordinate the transaction
if (safeStore.ranges().coordinates(txnId).intersects(route) &&
Route.isFullRoute(route))
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Pre-accepting");
Commands.preaccept(safeStore, safeCommand, participants,
txnId, partialTxn, null, false);
+ }
case NotDefined:
if (invalidIf == IfUncommitted)
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Marking
invalidIfUncommitted");
safeStore.progressLog().invalidIfUncommitted(txnId);
+ }
break;
}
@@ -354,12 +386,20 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
Participants<?> stillTouches = participants.stillTouches();
if (stillTouches.isEmpty())
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "No longer participating
(stillTouches is empty); using knownForAny: %s", known.knownForAny());
return known.knownForAny();
+ }
RedundantStatus status = safeStore.redundantBefore().status(txnId,
null, stillTouches);
// try to see if we can safely purge the full command
if (tryPurge(safeStore, safeCommand, status))
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Redundant with status
%s; purged", status);
return null;
+ }
// if the command has been truncated globally, then we should expect
to apply it
// if we cannot obtain enough information from a majority to do so
then we have been left behind
@@ -379,11 +419,15 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
{
Invariants.require(notStaleTouches.containsAll(stillTouches));
Invariants.require(notStaleOwnsOrMayExecute.containsAll(stillOwnsOrMayExecute));
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "No longer touches
any keys that were found truncated");
return required;
}
if (stillOwnsOrMayExecute.isEmpty() && (!found.is(Outcome.Apply)
|| known.hasFullyTruncated(staleTouches)))
{
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "No longer owns or
executes any keys that were found truncated; marking vestigial");
Commands.setTruncatedOrVestigial(safeStore, safeCommand,
participants);
return null;
}
@@ -391,23 +435,42 @@ public class Propagate implements PreLoadContext,
MapReduceConsume<SafeCommandSt
}
Participants<?> stale = staleTouches.with((Participants)
staleOwnsOrMayExecute);
+ // TODO (expected): could be that two replicas are stale but cannot
catch up;
+ // I think this condition is to ensure a full quorum has truncated
(so we haven't raced with truncation)
+ // should solve another way
if (!known.hasFullyTruncated(stale))
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Has participants %s
that could not be fetched. Some responses were not truncated, so we may have
raced with completion. Aborting.", stale);
return null;
+ }
// TODO (expected): trigger a refresh of redundantBefore; should be
available on a peer
// wait until we know the shard is ahead and we are behind
if (!safeStore.redundantBefore().isShardOnlyApplied(txnId, stale))
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Has participants %s
that could not be fetched, but the shard(s) have not been marked universally
durable so we will not mark ourselves stale. Aborting.", stale);
return null;
+ }
Participants<?> staleOnlyTouches =
staleTouches.without(staleOwnsOrMayExecute);
Invariants.expect(txnId.awaitsPreviouslyOwned() ||
staleOnlyTouches.isEmpty(), "%s is SHARD_ONLY_APPLIED, so we expect it to have
been filtered from StoreParticipants", staleOnlyTouches);
// TODO (expected): if the above last ditch doesn't work, see if only
the stale ranges can't apply and do some shenanigans to apply partially and
move on
if (ProtocolModifiers.Toggles.markStaleIfCannotExecute(txnId))
{
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Has participants %s
that could not be fetched and the shard(s) have been marked universally
durable. We have marked ourselves stale, and will apply the remaining ranges.",
stale);
+
safeStore.commandStore().markShardStale(safeStore,
executeAtIfKnown == null ? txnId : executeAtIfKnown, stale.toRanges(), true);
if (!stale.containsAll(stillTouches) ||
!stale.containsAll(stillOwnsOrMayExecute))
return required;
}
+ else
+ {
+ if (trace != null)
+ trace.trace(safeStore.commandStore(), "Has participants %s
that could not be fetched and the shard(s) have been marked universally
durable. This transaction type is configured not to induce staleness, so
erasing.", stale);
+ }
// TODO (expected): we might prefer to adopt Redundant status, and
permit ourselves to later accept the result of the execution and/or definition
Commands.setTruncatedOrVestigial(safeStore, safeCommand, participants);
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java
b/accord-core/src/main/java/accord/messages/ReadData.java
index 50beaf96..0d4f14d8 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -137,9 +137,9 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
public final Participants<?> scope;
public final long executeAtEpoch;
public final ExecuteFlags flags;
+ final boolean requiresListenersDuringExecution;
protected @Nullable PartialTxn partialTxn;
protected @Nullable Timestamp executeAt;
- private boolean fastExec;
private transient State state = State.PENDING; // TODO (low priority,
semantics): respond with the Executed result we have stored?
@@ -171,6 +171,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
this.partialTxn = txn == null ? null : txn.intersecting(scope, true);
this.executeAt = executeAt;
this.executeAtEpoch = executeAtEpoch;
+ this.requiresListenersDuringExecution =
requiresListenersDuringExecution(txnId, flags);
}
protected ReadData(TxnId txnId, Participants<?> scope, @Nullable
PartialTxn partialTxn, @Nullable Timestamp executeAt, long executeAtEpoch)
@@ -186,6 +187,12 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
this.executeAt = executeAt;
this.executeAtEpoch = executeAtEpoch;
this.flags = flags;
+ this.requiresListenersDuringExecution =
requiresListenersDuringExecution(txnId, flags);
+ }
+
+ private static boolean requiresListenersDuringExecution(TxnId txnId,
ExecuteFlags flags)
+ {
+ return !txnId.is(EphemeralRead) && (!dataStoreDetectsFutureReads() ||
!flags.contains(HAS_UNIQUE_HLC));
}
protected abstract ExecuteOn executeOn();
@@ -259,10 +266,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
stamp = node.currentStamp();
if (flags.contains(READY_TO_EXECUTE) &&
fastReadsMayBypassSafeStore(txnId) && partialTxn != null && executeAt != null
&& (txnId.is(EphemeralRead) || flags.contains(HAS_UNIQUE_HLC)))
- {
- fastExec = true;
return node.commandStores().mapReduceConsume(scope, minEpoch(),
executeAtEpoch, this::applyFastRead, this, this);
- }
return node.mapReduceConsumeLocal(this, scope, minEpoch(),
executeAtEpoch, this);
}
@@ -315,7 +319,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
return Redundant;
case EXECUTE:
- if (!dataStoreDetectsFutureReads())
+ if (requiresListenersDuringExecution)
listeners.put(storeId, safeStore.register(txnId,
this));
waitingOn.add(storeId);
++waitingOnCount;
@@ -400,7 +404,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
if (waitingOn.add(storeId))
++waitingOnCount;
- if (dataStoreDetectsFutureReads())
+ if (!requiresListenersDuringExecution)
listeners.remove(storeId);
execute = true;
@@ -411,7 +415,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
{
logger.trace("{}: executing read", command.txnId());
read(safeStore, command);
- return !dataStoreDetectsFutureReads();
+ return requiresListenersDuringExecution;
}
else
{
@@ -709,7 +713,7 @@ public abstract class ReadData implements PreLoadContext,
Request, MapReduceCons
protected void reply(Ranges unavailable, Data data, long uniqueHlc)
{
- if (data != null && !data.validateReply(txnId, executeAt, fastExec))
reply(Redundant, null);
+ if (data != null && !data.validateReply(txnId, executeAt,
!requiresListenersDuringExecution)) reply(Redundant, null);
else reply(new ReadOk(unavailable, data, uniqueHlc), null);
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListAgent.java
b/accord-core/src/test/java/accord/impl/list/ListAgent.java
index 49ff44c8..a67cffa1 100644
--- a/accord-core/src/test/java/accord/impl/list/ListAgent.java
+++ b/accord-core/src/test/java/accord/impl/list/ListAgent.java
@@ -28,9 +28,13 @@ import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+
import accord.api.Agent;
import accord.api.ProgressLog;
import accord.api.Result;
+import accord.api.TraceEventType;
+import accord.api.Tracing;
import accord.coordinate.CoordinationFailed;
import accord.coordinate.ExecuteSyncPoint;
import accord.impl.basic.NodeSink;
@@ -108,6 +112,15 @@ public class ListAgent implements Agent
}
}
+ @Nullable
+ @Override
+ public Tracing trace(TxnId txnId, TraceEventType eventType)
+ {
+ if (rnd.nextFloat() < 0.01f)
+ return (i1, i2) -> {};
+ return null;
+ }
+
@Override
public void onInconsistentTimestamp(Command command, Timestamp prev,
Timestamp next)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]