This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74a3b81c Expose epoch ready state as a vtable for operators to inspect
things (#169)
74a3b81c is described below
commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
Author: dcapwell <[email protected]>
AuthorDate: Wed Feb 12 10:02:44 2025 -0800
Expose epoch ready state as a vtable for operators to inspect things (#169)
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20302
---
accord-core/src/main/java/accord/local/Node.java | 2 +-
.../main/java/accord/topology/TopologyManager.java | 140 +++++++++++++++++++--
.../src/test/java/accord/utils/Property.java | 133 +++++++++++++++++---
3 files changed, 246 insertions(+), 29 deletions(-)
diff --git a/accord-core/src/main/java/accord/local/Node.java
b/accord-core/src/main/java/accord/local/Node.java
index 0cafa9ac..9f5326f7 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -382,7 +382,7 @@ public class Node implements ConfigurationService.Listener,
NodeCommandStoreServ
@Override
public void onEpochRetired(Ranges ranges, long epoch)
{
- topology.onEpochRedundant(ranges, epoch);
+ topology.onEpochRetired(ranges, epoch);
}
// TODO (required): audit error handling, as the refactor to provide epoch
timeouts appears to have broken a number of coordination
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 76347357..d7b3a1be 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,6 +20,7 @@ package accord.topology;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -31,8 +32,10 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import accord.api.Agent;
+import accord.api.ConfigurationService;
import accord.api.ConfigurationService.EpochReady;
import accord.api.ProtocolModifiers.QuorumEpochIntersections.Include;
import accord.api.Scheduler;
@@ -99,9 +102,12 @@ public class TopologyManager
private final QuorumTracker syncTracker;
private final BitSet curShardSyncComplete;
private final Ranges addedRanges, removedRanges;
+ @GuardedBy("TopologyManager.this")
private EpochReady ready;
+ @GuardedBy("TopologyManager.this")
private Ranges synced;
- Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
+ @GuardedBy("TopologyManager.this")
+ Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
EpochState(Id node, Topology global, TopologySorter sorter, Ranges
prevRanges)
{
@@ -170,12 +176,12 @@ public class TopologyManager
return true;
}
- boolean recordComplete(Ranges ranges)
+ boolean recordRetired(Ranges ranges)
{
- if (complete.containsAll(ranges))
+ if (retired.containsAll(ranges))
return false;
closed = closed.union(MERGE_ADJACENT, ranges);
- complete = complete.union(MERGE_ADJACENT, ranges);
+ retired = retired.union(MERGE_ADJACENT, ranges);
return true;
}
@@ -221,7 +227,7 @@ public class TopologyManager
static class Notifications
{
final Set<Id> syncComplete = new TreeSet<>();
- Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
+ Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
}
private static final Epochs EMPTY = new Epochs(new EpochState[0]);
@@ -360,14 +366,14 @@ public class TopologyManager
* Mark the epoch as "redundant" for the provided ranges; this means
that all transactions that can be
* proposed for this epoch have now been executed globally.
*/
- public void epochRedundant(Ranges ranges, long epoch)
+ public void epochRetired(Ranges ranges, long epoch)
{
Invariants.requireArgument(epoch > 0);
int i;
if (epoch > currentEpoch)
{
Notifications notifications = pending(epoch);
- notifications.complete =
notifications.complete.union(MERGE_ADJACENT, ranges);
+ notifications.retired =
notifications.retired.union(MERGE_ADJACENT, ranges);
i = 0; // record these ranges as complete for all earlier
epochs as well
}
else
@@ -376,7 +382,7 @@ public class TopologyManager
if (i < 0)
return;
}
- while (epochs[i].recordComplete(ranges) && ++i < epochs.length) {}
+ while (epochs[i].recordRetired(ranges) && ++i < epochs.length) {}
}
private Notifications pending(long epoch)
@@ -442,6 +448,85 @@ public class TopologyManager
}
}
+ // this class could be just the list, but left it here in case we wish to
expose "futureEpochs" and "pending" as well
+ public static class EpochsSnapshot implements
Iterable<EpochsSnapshot.Epoch>
+ {
+ public final ImmutableList<Epoch> epochs;
+
+ public EpochsSnapshot(ImmutableList<Epoch> epochs)
+ {
+ this.epochs = epochs;
+ }
+
+ @Override
+ public Iterator<Epoch> iterator()
+ {
+ return epochs.iterator();
+ }
+
+ public enum ResultStatus
+ {
+ PENDING("pending"),
+ SUCCESS("success"),
+ FAILURE("failure");
+
+ public final String value;
+
+ ResultStatus(String value)
+ {
+ this.value = value;
+ }
+
+ private static ResultStatus of(AsyncResult<?> result)
+ {
+ if (result == null || !result.isDone())
+ return PENDING;
+
+ return result.isSuccess() ? SUCCESS : FAILURE;
+ }
+ }
+
+ public static class EpochReady
+ {
+ public final ResultStatus metadata, coordinate, data, reads;
+
+ public EpochReady(ResultStatus metadata, ResultStatus coordinate,
ResultStatus data, ResultStatus reads)
+ {
+ this.metadata = metadata;
+ this.coordinate = coordinate;
+ this.data = data;
+ this.reads = reads;
+ }
+
+ private static EpochReady of(ConfigurationService.EpochReady ready)
+ {
+ return new EpochReady(ResultStatus.of(ready.metadata),
+ ResultStatus.of(ready.coordinate),
+ ResultStatus.of(ready.data),
+ ResultStatus.of(ready.reads));
+ }
+ }
+
+ public static class Epoch
+ {
+ public final long epoch;
+ public final EpochReady ready;
+ public final Ranges global, addedRanges, removedRanges, synced,
closed, retired;
+
+ public Epoch(long epoch, EpochReady ready, Ranges global, Ranges
addedRanges, Ranges removedRanges, Ranges synced, Ranges closed, Ranges retired)
+ {
+ this.epoch = epoch;
+ this.ready = ready;
+ this.global = global;
+ this.addedRanges = addedRanges;
+ this.removedRanges = removedRanges;
+ this.synced = synced;
+ this.closed = closed;
+ this.retired = retired;
+ }
+ }
+ }
+
private final TopologySorter.Supplier sorter;
private final TopologiesCollectors topologiesCollectors;
private final BestFastPath bestFastPath;
@@ -469,6 +554,35 @@ public class TopologyManager
this.localConfig = localConfig;
}
+ public EpochsSnapshot epochsSnapshot()
+ {
+ // Write to this volatile variable is done via synchronized, so this
is single-writer multi-consumer; safe to read without locks
+ Epochs epochs = this.epochs;
+ ImmutableList.Builder<EpochsSnapshot.Epoch> builder =
ImmutableList.builderWithExpectedSize(epochs.epochs.length);
+ for (int i = 0; i < epochs.epochs.length; i++)
+ {
+ // This class's state is mutable with regaurd to: ready, synced,
closed, retired
+ EpochState epoch = epochs.epochs[i];
+ // Even though this field is populated with the same lock epochs
is, it is done before publishing to epochs!
+ // For this reason the field maybe null, in which case we need to
use the lock to wait for the field.
+ EpochReady ready;
+ Ranges global, addedRanges, removedRanges, synced, closed, retired;
+ global = epoch.global.ranges.mergeTouching();
+ addedRanges = epoch.addedRanges;
+ removedRanges = epoch.removedRanges;
+ // ready, synced, closed, and retired all rely on TM's object lock
+ synchronized (this)
+ {
+ ready = epoch.ready;
+ synced = epoch.synced;
+ closed = epoch.closed;
+ retired = epoch.retired;
+ }
+ builder.add(new EpochsSnapshot.Epoch(epoch.epoch(),
EpochsSnapshot.EpochReady.of(ready), global, addedRanges, removedRanges,
synced, closed, retired));
+ }
+ return new EpochsSnapshot(builder.build());
+ }
+
public void shutdown()
{
topologyUpdateWatchdog.cancel();
@@ -515,11 +629,13 @@ public class TopologyManager
nextEpochs[0] = new EpochState(self, topology, sorter.get(topology),
prevAll);
notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
nextEpochs[0].recordClosed(notifications.closed);
- nextEpochs[0].recordComplete(notifications.complete);
+ nextEpochs[0].recordRetired(notifications.retired);
List<FutureEpoch> futureEpochs = new ArrayList<>(current.futureEpochs);
FutureEpoch toComplete = !futureEpochs.isEmpty() ?
futureEpochs.remove(0) : null;
epochs = new Epochs(nextEpochs, pending, futureEpochs);
+ //TODO (performance): this can trigger many callbacks leading to
effectivally unbounded work holding a critical lock!
+ // should look into moving this outside of the lock
if (toComplete != null)
toComplete.future.trySuccess(null);
@@ -600,9 +716,9 @@ public class TopologyManager
epochs.epochClosed(ranges, epoch);
}
- public synchronized void onEpochRedundant(Ranges ranges, long epoch)
+ public synchronized void onEpochRetired(Ranges ranges, long epoch)
{
- epochs.epochRedundant(ranges, epoch);
+ epochs.epochRetired(ranges, epoch);
}
public TopologySorter.Supplier sorter()
@@ -726,7 +842,7 @@ public class TopologyManager
return withSufficientEpochsAtLeast(select,
min == null ? Long.MIN_VALUE :
min.epoch(),
max == null ? Long.MAX_VALUE :
max.epoch(),
- prev -> prev.complete);
+ prev -> prev.retired);
}
private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long
minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor)
diff --git a/accord-core/src/test/java/accord/utils/Property.java
b/accord-core/src/test/java/accord/utils/Property.java
index 79c29c5a..4a7c2703 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -486,15 +486,10 @@ public class Property
}
catch (Throwable t)
{
- try
- {
- commands.destroySut(sut, t);
- commands.destroyState(state, t);
- }
- catch (Throwable t2)
- {
- t.addSuppressed(t2);
- }
+ State finalState = state;
+ safeHandle(t, () -> commands.onFailure(finalState,
sut, maybeRewriteHistory(history, historyTiming), t));
+ safeHandle(t, () -> commands.destroySut(sut, t));
+ safeHandle(t, () -> commands.destroyState(finalState,
t));
throw t;
}
}
@@ -511,6 +506,18 @@ public class Property
}
}
+ private static void safeHandle(Throwable t,
CommandsBuilder.FailingRunnable fn)
+ {
+ try
+ {
+ fn.run();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ }
+
private static List<String> maybeRewriteHistory(List<String> history,
@Nullable LongArrayList historyTiming)
{
if (historyTiming == null) return history;
@@ -768,11 +775,14 @@ public class Property
}
}
- public interface Commands<State, SystemUnderTest>
+ public interface Commands<State, SystemUnderTest> extends
StatefulSuccess<State, SystemUnderTest>, StatefulFailure<State, SystemUnderTest>
{
Gen<State> genInitialState() throws Throwable;
SystemUnderTest createSut(State state) throws Throwable;
+ @Override
default void onSuccess(State state, SystemUnderTest sut, List<String>
history) throws Throwable {}
+ @Override
+ default void onFailure(State state, SystemUnderTest sut, List<String>
history, Throwable cause) throws Throwable {}
default void destroyState(State state, @Nullable Throwable cause)
throws Throwable {}
default void destroySut(SystemUnderTest sut, @Nullable Throwable
cause) throws Throwable {}
Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws
Throwable;
@@ -790,7 +800,12 @@ public class Property
public interface StatefulSuccess<State, SystemUnderTest>
{
- void apply(State state, SystemUnderTest sut, List<String> history)
throws Throwable;
+ void onSuccess(State state, SystemUnderTest sut, List<String> history)
throws Throwable;
+ }
+
+ public interface StatefulFailure<State, SystemUnderTest>
+ {
+ void onFailure(State state, SystemUnderTest sut, List<String> history,
Throwable cause) throws Throwable;
}
public static class CommandsBuilder<State, SystemUnderTest>
@@ -806,6 +821,8 @@ public class Property
private Set<Setup<State, SystemUnderTest>> unknownWeights = null;
@Nullable
private Map<Predicate<State>, List<Setup<State, SystemUnderTest>>>
conditionalCommands = null;
+ @Nullable
+ private Map<Predicate<State>, List<Pair<Setup<State, SystemUnderTest>,
Integer>>> conditionalCommandsKnownWeights = null;
private Gen.IntGen unknownWeightGen = Gens.ints().between(1, 10);
@Nullable
private FailingConsumer<State> preCommands = null;
@@ -816,6 +833,7 @@ public class Property
@Nullable
private BiFunction<State, Gen<Command<State, SystemUnderTest, ?>>,
Gen<Command<State, SystemUnderTest, ?>>> commandsTransformer = null;
private final List<StatefulSuccess<State, SystemUnderTest>> onSuccess
= new ArrayList<>();
+ private final List<StatefulFailure<State, SystemUnderTest>> onFailures
= new ArrayList<>();
public CommandsBuilder(Supplier<Gen<State>> stateGen, Function<State,
SystemUnderTest> sutFactory)
{
@@ -909,10 +927,35 @@ public class Property
return this;
}
+ public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, int weight, Command<State, SystemUnderTest, ?> cmd)
+ {
+ return addIf(predicate, weight, (i1, i2) -> cmd);
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, int weight, Gen<Command<State, SystemUnderTest, ?>> cmd)
+ {
+ return addIf(predicate, weight, (rs, state) -> cmd.next(rs));
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, int weight, Setup<State, SystemUnderTest> cmd)
+ {
+ if (conditionalCommandsKnownWeights == null)
+ conditionalCommandsKnownWeights = new LinkedHashMap<>();
+ conditionalCommandsKnownWeights.computeIfAbsent(predicate, i ->
new ArrayList<>()).add(Pair.create(cmd, weight));
+ return this;
+ }
+
public CommandsBuilder<State, SystemUnderTest>
addAllIf(Predicate<State> predicate, Consumer<IfBuilder<State,
SystemUnderTest>> sub)
{
sub.accept(new IfBuilder<>()
{
+ @Override
+ public IfBuilder<State, SystemUnderTest> add(int weight,
Setup<State, SystemUnderTest> cmd)
+ {
+ CommandsBuilder.this.addIf(predicate, weight, cmd);
+ return this;
+ }
+
@Override
public IfBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd)
{
@@ -920,6 +963,7 @@ public class Property
return this;
}
+
@Override
public IfBuilder<State, SystemUnderTest>
addIf(Predicate<State> nextPredicate, Setup<State, SystemUnderTest> cmd) {
CommandsBuilder.this.addIf(predicate.and(nextPredicate),
cmd);
@@ -931,6 +975,24 @@ public class Property
public interface IfBuilder<State, SystemUnderTest>
{
+ default IfBuilder<State, SystemUnderTest> add(int weight,
Command<State, SystemUnderTest, ?> cmd)
+ {
+ return add(weight, (i1, i2) -> cmd);
+ }
+ default IfBuilder<State, SystemUnderTest> add(int weight,
Gen<Command<State, SystemUnderTest, ?>> cmd)
+ {
+ return add(weight, (rs, state) -> cmd.next(rs));
+ }
+ IfBuilder<State, SystemUnderTest> add(int weight, Setup<State,
SystemUnderTest> cmd);
+
+ default IfBuilder<State, SystemUnderTest> add(Command<State,
SystemUnderTest, ?> cmd)
+ {
+ return add((i1, i2) -> cmd);
+ }
+ default IfBuilder<State, SystemUnderTest> add(Gen<Command<State,
SystemUnderTest, ?>> cmd)
+ {
+ return add((rs, state) -> cmd.next(rs));
+ }
IfBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd);
IfBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Setup<State, SystemUnderTest> cmd);
}
@@ -953,10 +1015,16 @@ public class Property
return this;
}
+ public CommandsBuilder<State, SystemUnderTest>
onFailure(StatefulFailure<State, SystemUnderTest> fn)
+ {
+ onFailures.add(fn);
+ return this;
+ }
+
public Commands<State, SystemUnderTest> build()
{
Gen<Setup<State, SystemUnderTest>> commandsGen;
- if (unknownWeights == null && conditionalCommands == null)
+ if (unknownWeights == null && conditionalCommands == null &&
conditionalCommandsKnownWeights == null)
{
commandsGen = Gens.pick(new LinkedHashMap<>(knownWeights));
}
@@ -989,15 +1057,36 @@ public class Property
conditionalWeights.put(c,
unknownWeightGen.nextInt(rs));
}
}
+ if (conditionalCommandsKnownWeights != null)
+ {
+ if (conditionalWeights == null)
+ conditionalWeights = new LinkedHashMap<>();
+ for (List<Pair<Setup<State, SystemUnderTest>,
Integer>> commands : conditionalCommandsKnownWeights.values())
+ {
+ for (Pair<Setup<State, SystemUnderTest>,
Integer> pair : commands)
+ conditionalWeights.put(pair.left,
pair.right);
+ }
+ }
}
if (conditionalWeights == null) return
nonConditional.next(rs);
return (r, s) -> {
// need to figure out what conditions apply...
LinkedHashMap<Setup<State, SystemUnderTest>,
Integer> clone = new LinkedHashMap<>(weights);
- for (Map.Entry<Predicate<State>, List<Setup<State,
SystemUnderTest>>> e : conditionalCommands.entrySet())
+ if (conditionalCommands != null)
+ {
+ for (Map.Entry<Predicate<State>,
List<Setup<State, SystemUnderTest>>> e : conditionalCommands.entrySet())
+ {
+ if (e.getKey().test(s))
+ e.getValue().forEach(c -> clone.put(c,
conditionalWeights.get(c)));
+ }
+ }
+ if (conditionalCommandsKnownWeights != null)
{
- if (e.getKey().test(s))
- e.getValue().forEach(c -> clone.put(c,
conditionalWeights.get(c)));
+ for (Map.Entry<Predicate<State>,
List<Pair<Setup<State, SystemUnderTest>, Integer>>> e :
conditionalCommandsKnownWeights.entrySet())
+ {
+ if (e.getKey().test(s))
+ e.getValue().forEach(p ->
clone.put(p.left, p.right));
+ }
}
Setup<State, SystemUnderTest> select =
Gens.pick(clone).next(r);
return select.setup(r, s);
@@ -1056,7 +1145,14 @@ public class Property
public void onSuccess(State state, SystemUnderTest sut,
List<String> history) throws Throwable
{
for (var fn : onSuccess)
- fn.apply(state, sut, history);
+ fn.onSuccess(state, sut, history);
+ }
+
+ @Override
+ public void onFailure(State state, SystemUnderTest sut,
List<String> history, Throwable cause) throws Throwable
+ {
+ for (var fn : onFailures)
+ fn.onFailure(state, sut, history, cause);
}
};
}
@@ -1066,6 +1162,11 @@ public class Property
void accept(T value) throws Throwable;
}
+ public interface FailingRunnable
+ {
+ void run() throws Throwable;
+ }
+
public interface FailingBiConsumer<A, B>
{
void accept(A a, B b) throws Throwable;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]