This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74a3b81c Expose epoch ready state as a vtable for operators to inspect 
things (#169)
74a3b81c is described below

commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
Author: dcapwell <[email protected]>
AuthorDate: Wed Feb 12 10:02:44 2025 -0800

    Expose epoch ready state as a vtable for operators to inspect things (#169)
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20302
---
 accord-core/src/main/java/accord/local/Node.java   |   2 +-
 .../main/java/accord/topology/TopologyManager.java | 140 +++++++++++++++++++--
 .../src/test/java/accord/utils/Property.java       | 133 +++++++++++++++++---
 3 files changed, 246 insertions(+), 29 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 0cafa9ac..9f5326f7 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -382,7 +382,7 @@ public class Node implements ConfigurationService.Listener, 
NodeCommandStoreServ
     @Override
     public void onEpochRetired(Ranges ranges, long epoch)
     {
-        topology.onEpochRedundant(ranges, epoch);
+        topology.onEpochRetired(ranges, epoch);
     }
 
     // TODO (required): audit error handling, as the refactor to provide epoch 
timeouts appears to have broken a number of coordination
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 76347357..d7b3a1be 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -20,6 +20,7 @@ package accord.topology;
 
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -31,8 +32,10 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
 import accord.api.Agent;
+import accord.api.ConfigurationService;
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.ProtocolModifiers.QuorumEpochIntersections.Include;
 import accord.api.Scheduler;
@@ -99,9 +102,12 @@ public class TopologyManager
         private final QuorumTracker syncTracker;
         private final BitSet curShardSyncComplete;
         private final Ranges addedRanges, removedRanges;
+        @GuardedBy("TopologyManager.this")
         private EpochReady ready;
+        @GuardedBy("TopologyManager.this")
         private Ranges synced;
-        Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
+        @GuardedBy("TopologyManager.this")
+        Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
 
         EpochState(Id node, Topology global, TopologySorter sorter, Ranges 
prevRanges)
         {
@@ -170,12 +176,12 @@ public class TopologyManager
             return true;
         }
 
-        boolean recordComplete(Ranges ranges)
+        boolean recordRetired(Ranges ranges)
         {
-            if (complete.containsAll(ranges))
+            if (retired.containsAll(ranges))
                 return false;
             closed = closed.union(MERGE_ADJACENT, ranges);
-            complete = complete.union(MERGE_ADJACENT, ranges);
+            retired = retired.union(MERGE_ADJACENT, ranges);
             return true;
         }
 
@@ -221,7 +227,7 @@ public class TopologyManager
         static class Notifications
         {
             final Set<Id> syncComplete = new TreeSet<>();
-            Ranges closed = Ranges.EMPTY, complete = Ranges.EMPTY;
+            Ranges closed = Ranges.EMPTY, retired = Ranges.EMPTY;
         }
 
         private static final Epochs EMPTY = new Epochs(new EpochState[0]);
@@ -360,14 +366,14 @@ public class TopologyManager
          * Mark the epoch as "redundant" for the provided ranges; this means 
that all transactions that can be
          * proposed for this epoch have now been executed globally.
          */
-        public void epochRedundant(Ranges ranges, long epoch)
+        public void epochRetired(Ranges ranges, long epoch)
         {
             Invariants.requireArgument(epoch > 0);
             int i;
             if (epoch > currentEpoch)
             {
                 Notifications notifications = pending(epoch);
-                notifications.complete = 
notifications.complete.union(MERGE_ADJACENT, ranges);
+                notifications.retired = 
notifications.retired.union(MERGE_ADJACENT, ranges);
                 i = 0; // record these ranges as complete for all earlier 
epochs as well
             }
             else
@@ -376,7 +382,7 @@ public class TopologyManager
                 if (i < 0)
                     return;
             }
-            while (epochs[i].recordComplete(ranges) && ++i < epochs.length) {}
+            while (epochs[i].recordRetired(ranges) && ++i < epochs.length) {}
         }
 
         private Notifications pending(long epoch)
@@ -442,6 +448,85 @@ public class TopologyManager
         }
     }
 
+    // this class could be just the list, but left it here in case we wish to 
expose "futureEpochs" and "pending" as well
+    public static class EpochsSnapshot implements 
Iterable<EpochsSnapshot.Epoch>
+    {
+        public final ImmutableList<Epoch> epochs;
+
+        public EpochsSnapshot(ImmutableList<Epoch> epochs)
+        {
+            this.epochs = epochs;
+        }
+
+        @Override
+        public Iterator<Epoch> iterator()
+        {
+            return epochs.iterator();
+        }
+
+        public enum ResultStatus
+        {
+            PENDING("pending"),
+            SUCCESS("success"),
+            FAILURE("failure");
+
+            public final String value;
+
+            ResultStatus(String value)
+            {
+                this.value = value;
+            }
+
+            private static ResultStatus of(AsyncResult<?> result)
+            {
+                if (result == null || !result.isDone())
+                    return PENDING;
+
+                return result.isSuccess() ? SUCCESS : FAILURE;
+            }
+        }
+
+        public static class EpochReady
+        {
+            public final ResultStatus metadata, coordinate, data, reads;
+
+            public EpochReady(ResultStatus metadata, ResultStatus coordinate, 
ResultStatus data, ResultStatus reads)
+            {
+                this.metadata = metadata;
+                this.coordinate = coordinate;
+                this.data = data;
+                this.reads = reads;
+            }
+
+            private static EpochReady of(ConfigurationService.EpochReady ready)
+            {
+                return new EpochReady(ResultStatus.of(ready.metadata),
+                                      ResultStatus.of(ready.coordinate),
+                                      ResultStatus.of(ready.data),
+                                      ResultStatus.of(ready.reads));
+            }
+        }
+
+        public static class Epoch
+        {
+            public final long epoch;
+            public final EpochReady ready;
+            public final Ranges global, addedRanges, removedRanges, synced, 
closed, retired;
+
+            public Epoch(long epoch, EpochReady ready, Ranges global, Ranges 
addedRanges, Ranges removedRanges, Ranges synced, Ranges closed, Ranges retired)
+            {
+                this.epoch = epoch;
+                this.ready = ready;
+                this.global = global;
+                this.addedRanges = addedRanges;
+                this.removedRanges = removedRanges;
+                this.synced = synced;
+                this.closed = closed;
+                this.retired = retired;
+            }
+        }
+    }
+
     private final TopologySorter.Supplier sorter;
     private final TopologiesCollectors topologiesCollectors;
     private final BestFastPath bestFastPath;
@@ -469,6 +554,35 @@ public class TopologyManager
         this.localConfig = localConfig;
     }
 
+    public EpochsSnapshot epochsSnapshot()
+    {
+        // Write to this volatile variable is done via synchronized, so this 
is single-writer multi-consumer; safe to read without locks
+        Epochs epochs = this.epochs;
+        ImmutableList.Builder<EpochsSnapshot.Epoch> builder = 
ImmutableList.builderWithExpectedSize(epochs.epochs.length);
+        for (int i = 0; i < epochs.epochs.length; i++)
+        {
+            // This class's state is mutable with regaurd to: ready, synced, 
closed, retired
+            EpochState epoch = epochs.epochs[i];
+            // Even though this field is populated with the same lock epochs 
is, it is done before publishing to epochs!
+            // For this reason the field maybe null, in which case we need to 
use the lock to wait for the field.
+            EpochReady ready;
+            Ranges global, addedRanges, removedRanges, synced, closed, retired;
+            global = epoch.global.ranges.mergeTouching();
+            addedRanges = epoch.addedRanges;
+            removedRanges = epoch.removedRanges;
+            // ready, synced, closed, and retired all rely on TM's object lock
+            synchronized (this)
+            {
+                ready = epoch.ready;
+                synced = epoch.synced;
+                closed = epoch.closed;
+                retired = epoch.retired;
+            }
+            builder.add(new EpochsSnapshot.Epoch(epoch.epoch(), 
EpochsSnapshot.EpochReady.of(ready), global, addedRanges, removedRanges, 
synced, closed, retired));
+        }
+        return new EpochsSnapshot(builder.build());
+    }
+
     public void shutdown()
     {
         topologyUpdateWatchdog.cancel();
@@ -515,11 +629,13 @@ public class TopologyManager
         nextEpochs[0] = new EpochState(self, topology, sorter.get(topology), 
prevAll);
         notifications.syncComplete.forEach(nextEpochs[0]::recordSyncComplete);
         nextEpochs[0].recordClosed(notifications.closed);
-        nextEpochs[0].recordComplete(notifications.complete);
+        nextEpochs[0].recordRetired(notifications.retired);
 
         List<FutureEpoch> futureEpochs = new ArrayList<>(current.futureEpochs);
         FutureEpoch toComplete = !futureEpochs.isEmpty() ? 
futureEpochs.remove(0) : null;
         epochs = new Epochs(nextEpochs, pending, futureEpochs);
+        //TODO (performance): this can trigger many callbacks leading to 
effectivally unbounded work holding a critical lock!
+        // should look into moving this outside of the lock
         if (toComplete != null)
             toComplete.future.trySuccess(null);
 
@@ -600,9 +716,9 @@ public class TopologyManager
         epochs.epochClosed(ranges, epoch);
     }
 
-    public synchronized void onEpochRedundant(Ranges ranges, long epoch)
+    public synchronized void onEpochRetired(Ranges ranges, long epoch)
     {
-        epochs.epochRedundant(ranges, epoch);
+        epochs.epochRetired(ranges, epoch);
     }
 
     public TopologySorter.Supplier sorter()
@@ -726,7 +842,7 @@ public class TopologyManager
         return withSufficientEpochsAtLeast(select,
                                           min == null ? Long.MIN_VALUE : 
min.epoch(),
                                           max == null ? Long.MAX_VALUE : 
max.epoch(),
-                                          prev -> prev.complete);
+                                          prev -> prev.retired);
     }
 
     private Topologies withSufficientEpochsAtLeast(Unseekables<?> select, long 
minEpoch, long maxEpoch, Function<EpochState, Ranges> isSufficientFor)
diff --git a/accord-core/src/test/java/accord/utils/Property.java 
b/accord-core/src/test/java/accord/utils/Property.java
index 79c29c5a..4a7c2703 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -486,15 +486,10 @@ public class Property
                     }
                     catch (Throwable t)
                     {
-                        try
-                        {
-                            commands.destroySut(sut, t);
-                            commands.destroyState(state, t);
-                        }
-                        catch (Throwable t2)
-                        {
-                            t.addSuppressed(t2);
-                        }
+                        State finalState = state;
+                        safeHandle(t, () -> commands.onFailure(finalState, 
sut, maybeRewriteHistory(history, historyTiming), t));
+                        safeHandle(t, () -> commands.destroySut(sut, t));
+                        safeHandle(t, () -> commands.destroyState(finalState, 
t));
                         throw t;
                     }
                 }
@@ -511,6 +506,18 @@ public class Property
             }
         }
 
+        private static void safeHandle(Throwable t, 
CommandsBuilder.FailingRunnable fn)
+        {
+            try
+            {
+                fn.run();
+            }
+            catch (Throwable t2)
+            {
+                t.addSuppressed(t2);
+            }
+        }
+
         private static List<String> maybeRewriteHistory(List<String> history, 
@Nullable LongArrayList historyTiming)
         {
             if (historyTiming == null) return history;
@@ -768,11 +775,14 @@ public class Property
         }
     }
 
-    public interface Commands<State, SystemUnderTest>
+    public interface Commands<State, SystemUnderTest> extends 
StatefulSuccess<State, SystemUnderTest>, StatefulFailure<State, SystemUnderTest>
     {
         Gen<State> genInitialState() throws Throwable;
         SystemUnderTest createSut(State state) throws Throwable;
+        @Override
         default void onSuccess(State state, SystemUnderTest sut, List<String> 
history) throws Throwable {}
+        @Override
+        default void onFailure(State state, SystemUnderTest sut, List<String> 
history, Throwable cause) throws Throwable {}
         default void destroyState(State state, @Nullable Throwable cause) 
throws Throwable {}
         default void destroySut(SystemUnderTest sut, @Nullable Throwable 
cause) throws Throwable {}
         Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws 
Throwable;
@@ -790,7 +800,12 @@ public class Property
 
     public interface StatefulSuccess<State, SystemUnderTest>
     {
-        void apply(State state, SystemUnderTest sut, List<String> history) 
throws Throwable;
+        void onSuccess(State state, SystemUnderTest sut, List<String> history) 
throws Throwable;
+    }
+
+    public interface StatefulFailure<State, SystemUnderTest>
+    {
+        void onFailure(State state, SystemUnderTest sut, List<String> history, 
Throwable cause) throws Throwable;
     }
 
     public static class CommandsBuilder<State, SystemUnderTest>
@@ -806,6 +821,8 @@ public class Property
         private Set<Setup<State, SystemUnderTest>> unknownWeights = null;
         @Nullable
         private Map<Predicate<State>, List<Setup<State, SystemUnderTest>>> 
conditionalCommands = null;
+        @Nullable
+        private Map<Predicate<State>, List<Pair<Setup<State, SystemUnderTest>, 
Integer>>> conditionalCommandsKnownWeights = null;
         private Gen.IntGen unknownWeightGen = Gens.ints().between(1, 10);
         @Nullable
         private FailingConsumer<State> preCommands = null;
@@ -816,6 +833,7 @@ public class Property
         @Nullable
         private BiFunction<State, Gen<Command<State, SystemUnderTest, ?>>, 
Gen<Command<State, SystemUnderTest, ?>>> commandsTransformer = null;
         private final List<StatefulSuccess<State, SystemUnderTest>> onSuccess 
= new ArrayList<>();
+        private final List<StatefulFailure<State, SystemUnderTest>> onFailures 
= new ArrayList<>();
 
         public CommandsBuilder(Supplier<Gen<State>> stateGen, Function<State, 
SystemUnderTest> sutFactory)
         {
@@ -909,10 +927,35 @@ public class Property
             return this;
         }
 
+        public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, int weight, Command<State, SystemUnderTest, ?> cmd)
+        {
+            return addIf(predicate, weight, (i1, i2) -> cmd);
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, int weight, Gen<Command<State, SystemUnderTest, ?>> cmd)
+        {
+            return addIf(predicate, weight, (rs, state) -> cmd.next(rs));
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, int weight, Setup<State, SystemUnderTest> cmd)
+        {
+            if (conditionalCommandsKnownWeights == null)
+                conditionalCommandsKnownWeights = new LinkedHashMap<>();
+            conditionalCommandsKnownWeights.computeIfAbsent(predicate, i -> 
new ArrayList<>()).add(Pair.create(cmd, weight));
+            return this;
+        }
+
         public CommandsBuilder<State, SystemUnderTest> 
addAllIf(Predicate<State> predicate, Consumer<IfBuilder<State, 
SystemUnderTest>> sub)
         {
             sub.accept(new IfBuilder<>()
             {
+                @Override
+                public IfBuilder<State, SystemUnderTest> add(int weight, 
Setup<State, SystemUnderTest> cmd)
+                {
+                    CommandsBuilder.this.addIf(predicate, weight, cmd);
+                    return this;
+                }
+
                 @Override
                 public IfBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd)
                 {
@@ -920,6 +963,7 @@ public class Property
                     return this;
                 }
 
+
                 @Override
                 public IfBuilder<State, SystemUnderTest> 
addIf(Predicate<State> nextPredicate, Setup<State, SystemUnderTest> cmd) {
                     CommandsBuilder.this.addIf(predicate.and(nextPredicate), 
cmd);
@@ -931,6 +975,24 @@ public class Property
 
         public interface IfBuilder<State, SystemUnderTest>
         {
+            default IfBuilder<State, SystemUnderTest> add(int weight, 
Command<State, SystemUnderTest, ?> cmd)
+            {
+                return add(weight, (i1, i2) -> cmd);
+            }
+            default IfBuilder<State, SystemUnderTest> add(int weight, 
Gen<Command<State, SystemUnderTest, ?>> cmd)
+            {
+                return add(weight, (rs, state) -> cmd.next(rs));
+            }
+            IfBuilder<State, SystemUnderTest> add(int weight, Setup<State, 
SystemUnderTest> cmd);
+
+            default IfBuilder<State, SystemUnderTest> add(Command<State, 
SystemUnderTest, ?> cmd)
+            {
+                return add((i1, i2) -> cmd);
+            }
+            default IfBuilder<State, SystemUnderTest> add(Gen<Command<State, 
SystemUnderTest, ?>> cmd)
+            {
+                return add((rs, state) -> cmd.next(rs));
+            }
             IfBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd);
             IfBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, Setup<State, SystemUnderTest> cmd);
         }
@@ -953,10 +1015,16 @@ public class Property
             return this;
         }
 
+        public CommandsBuilder<State, SystemUnderTest> 
onFailure(StatefulFailure<State, SystemUnderTest> fn)
+        {
+            onFailures.add(fn);
+            return this;
+        }
+
         public Commands<State, SystemUnderTest> build()
         {
             Gen<Setup<State, SystemUnderTest>> commandsGen;
-            if (unknownWeights == null && conditionalCommands == null)
+            if (unknownWeights == null && conditionalCommands == null && 
conditionalCommandsKnownWeights == null)
             {
                 commandsGen = Gens.pick(new LinkedHashMap<>(knownWeights));
             }
@@ -989,15 +1057,36 @@ public class Property
                                         conditionalWeights.put(c, 
unknownWeightGen.nextInt(rs));
                                 }
                             }
+                            if (conditionalCommandsKnownWeights != null)
+                            {
+                                if (conditionalWeights == null)
+                                    conditionalWeights = new LinkedHashMap<>();
+                                for (List<Pair<Setup<State, SystemUnderTest>, 
Integer>> commands : conditionalCommandsKnownWeights.values())
+                                {
+                                    for (Pair<Setup<State, SystemUnderTest>, 
Integer> pair : commands)
+                                        conditionalWeights.put(pair.left, 
pair.right);
+                                }
+                            }
                         }
                         if (conditionalWeights == null) return 
nonConditional.next(rs);
                         return (r, s) -> {
                             // need to figure out what conditions apply...
                             LinkedHashMap<Setup<State, SystemUnderTest>, 
Integer> clone = new LinkedHashMap<>(weights);
-                            for (Map.Entry<Predicate<State>, List<Setup<State, 
SystemUnderTest>>> e : conditionalCommands.entrySet())
+                            if (conditionalCommands != null)
+                            {
+                                for (Map.Entry<Predicate<State>, 
List<Setup<State, SystemUnderTest>>> e : conditionalCommands.entrySet())
+                                {
+                                    if (e.getKey().test(s))
+                                        e.getValue().forEach(c -> clone.put(c, 
conditionalWeights.get(c)));
+                                }
+                            }
+                            if (conditionalCommandsKnownWeights != null)
                             {
-                                if (e.getKey().test(s))
-                                    e.getValue().forEach(c -> clone.put(c, 
conditionalWeights.get(c)));
+                                for (Map.Entry<Predicate<State>, 
List<Pair<Setup<State, SystemUnderTest>, Integer>>> e : 
conditionalCommandsKnownWeights.entrySet())
+                                {
+                                    if (e.getKey().test(s))
+                                        e.getValue().forEach(p -> 
clone.put(p.left, p.right));
+                                }
                             }
                             Setup<State, SystemUnderTest> select = 
Gens.pick(clone).next(r);
                             return select.setup(r, s);
@@ -1056,7 +1145,14 @@ public class Property
                 public void onSuccess(State state, SystemUnderTest sut, 
List<String> history) throws Throwable
                 {
                     for (var fn : onSuccess)
-                        fn.apply(state, sut, history);
+                        fn.onSuccess(state, sut, history);
+                }
+
+                @Override
+                public void onFailure(State state, SystemUnderTest sut, 
List<String> history, Throwable cause) throws Throwable
+                {
+                    for (var fn : onFailures)
+                        fn.onFailure(state, sut, history, cause);
                 }
             };
         }
@@ -1066,6 +1162,11 @@ public class Property
             void accept(T value) throws Throwable;
         }
 
+        public interface FailingRunnable
+        {
+            void run() throws Throwable;
+        }
+
         public interface FailingBiConsumer<A, B>
         {
             void accept(A a, B b) throws Throwable;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to