This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdabc1f977 Create a fuzz test that randomizes topology changes, 
cluster actions, and CQL operations
fdabc1f977 is described below

commit fdabc1f9774b1c06d68a0acbf8a470e45c787eec
Author: David Capwell <[email protected]>
AuthorDate: Wed Aug 21 10:03:07 2024 -0700

    Create a fuzz test that randomizes topology changes, cluster actions, and 
CQL operations
    
    patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19847
---
 .../distributed/impl/AbstractCluster.java          |  61 +-
 .../distributed/impl/INodeProvisionStrategy.java   | 161 +++--
 .../cassandra/distributed/impl/InstanceConfig.java |   5 +-
 .../cassandra/distributed/shared/ClusterUtils.java |  35 +-
 .../fuzz/topology/HarryTopologyMixupTest.java      | 159 +++++
 .../fuzz/topology/TopologyMixupTestBase.java       | 697 +++++++++++++++++++++
 .../org/apache/cassandra/harry/HarryHelper.java    |   6 +-
 .../sut/injvm/InJVMTokenAwareVisitExecutor.java    |   6 +-
 .../cassandra/harry/sut/injvm/InJvmSutBase.java    |  13 +-
 .../sut/injvm/QuiescentLocalStateChecker.java      |   7 +-
 test/unit/accord/utils/Gen.java                    |  66 +-
 test/unit/accord/utils/Gens.java                   | 332 +++++++++-
 test/unit/accord/utils/Property.java               | 591 ++++++++++++++++-
 test/unit/accord/utils/SeedProvider.java           |  51 ++
 test/unit/accord/utils/async/TimeoutUtils.java     |  70 +++
 .../concurrent/SimulatedExecutorFactory.java       |   2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   2 +-
 .../cassandra/net/SimulatedMessageDelivery.java    |   2 +-
 .../org/apache/cassandra/repair/FuzzTestBase.java  |   4 +-
 .../apache/cassandra/utils/ConfigGenBuilder.java   |  12 +-
 .../cassandra/utils/ConfigGenBuilderTest.java      |   2 +-
 21 files changed, 2102 insertions(+), 182 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 2c3e446d3c..54f90e09f7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -29,7 +29,6 @@ import java.nio.file.FileSystem;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -51,11 +50,11 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import org.junit.Assume;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,7 +162,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
 
     // mutated by user-facing API
     private final MessageFilters filters;
-    private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
+    private final INodeProvisionStrategy.Factory nodeProvisionStrategy;
     private final IInstanceInitializer instanceInitializer;
     private final int datadirCount;
     private volatile Thread.UncaughtExceptionHandler previousHandler = null;
@@ -181,7 +180,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
     public static abstract class AbstractBuilder<I extends IInstance, C 
extends ICluster, B extends AbstractBuilder<I, C, B>>
         extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, 
B>
     {
-        private INodeProvisionStrategy.Strategy nodeProvisionStrategy = 
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
+        private INodeProvisionStrategy.Factory nodeProvisionStrategy = 
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
         private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
         private boolean dynamicPortAllocation = false;
 
@@ -206,7 +205,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
             return (B) this;
         }
 
-        public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy 
nodeProvisionStrategy)
+        public B withNodeProvisionStrategy(INodeProvisionStrategy.Factory 
nodeProvisionStrategy)
         {
             this.nodeProvisionStrategy = nodeProvisionStrategy;
             return self();
@@ -280,7 +279,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         protected IInvokableInstance delegate()
         {
             if (delegate == null)
-                throw new IllegalStateException("Can't use shutdown instances, 
delegate is null");
+                throw new IllegalStateException("Can't use shutdown node" + 
config.num() + ", delegate is null");
             return delegate;
         }
 
@@ -1101,7 +1100,12 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
         }
         catch (Throwable t)
         {
-            checkForThreadLeaks();
+            IllegalStateException leak = checkForThreadLeaks();
+            if (leak != null)
+            {
+                leak.initCause(t);
+                throw leak;
+            }
             throw t;
         }
         instances.clear();
@@ -1128,36 +1132,33 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
             return true;
         });
         if (!drain.isEmpty())
-            throw new ShutdownException(drain);
+        {
+            ShutdownException shutdownException = new ShutdownException(drain);
+            // also log as java will truncate log lists
+            logger.error("Unexpected errors", shutdownException);
+            throw shutdownException;
+        }
     }
 
-    private void checkForThreadLeaks()
+    @Nullable
+    private IllegalStateException checkForThreadLeaks()
     {
         //This is an alternate version of the thread leak check that just 
checks to see if any threads are still alive
         // with the context classloader.
-        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
-        threadSet.stream().filter(t->t.getContextClassLoader() instanceof 
InstanceClassLoader).forEach(t->{
-            t.setContextClassLoader(null);
-            throw new RuntimeException("Unterminated thread detected " + 
t.getName() + " in group " + t.getThreadGroup().getName());
-        });
-    }
-
-    // We do not want this check to run every time until we fix problems with 
tread stops
-    private void withThreadLeakCheck(List<Future<?>> futures)
-    {
-        FBUtilities.waitOnFutures(futures);
-
-        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
-        threadSet = Sets.difference(threadSet, 
Collections.singletonMap(Thread.currentThread(), null).keySet());
-        if (!threadSet.isEmpty())
+        Map<Thread, StackTraceElement[]> allThreads = 
Thread.getAllStackTraces();
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<Thread, StackTraceElement[]> e : allThreads.entrySet())
         {
-            for (Thread thread : threadSet)
-            {
-                System.out.println(thread);
-                System.out.println(Arrays.toString(thread.getStackTrace()));
-            }
-            throw new RuntimeException(String.format("Not all threads have 
shut down. %d threads are still running: %s", threadSet.size(), threadSet));
+
+            if (!(e.getKey().getContextClassLoader() instanceof 
InstanceClassLoader)) continue;
+            e.getKey().setContextClassLoader(null);
+            sb.append(e.getKey().getName()).append(":\n");
+            for (StackTraceElement s : e.getValue())
+                sb.append("\t").append(s).append("\n");
         }
+        return sb.length() > 0
+               ? new IllegalStateException("Unterminated threads detected; 
active threads:\n" + sb)
+               : null;
     }
 
     public List<Token> tokens()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index f2c088c0e5..0c10ee64e1 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -29,62 +29,51 @@ import static 
org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
 @Shared(inner = INTERFACES)
 public interface INodeProvisionStrategy
 {
-    enum Strategy
+    @FunctionalInterface
+    interface Factory
+    {
+        INodeProvisionStrategy create(int subnet, @Nullable Map<String, 
Integer> portMap);
+    }
+
+    enum Strategy implements Factory
     {
         OneNetworkInterface
         {
             @Override
-            INodeProvisionStrategy create(int subnet, @Nullable Map<String, 
Integer> portMap)
+            public INodeProvisionStrategy create(int subnet, @Nullable 
Map<String, Integer> portMap)
             {
                 String ipAdress = "127.0." + subnet + ".1";
-                return new INodeProvisionStrategy()
+                return new AbstractNodeProvisionStrategy(portMap)
                 {
-                    @Override
-                    public String seedIp()
-                    {
-                        return ipAdress;
-                    }
 
                     @Override
-                    public int seedPort()
+                    protected int computeStoragePort(int nodeNum)
                     {
-                        return storagePort(1);
+                        return 7011 + nodeNum;
                     }
 
                     @Override
-                    public String ipAddress(int nodeNum)
+                    protected int computeNativeTransportPort(int nodeNum)
                     {
-                        return ipAdress;
+                        return 9041 + nodeNum;
                     }
 
                     @Override
-                    public int storagePort(int nodeNum)
+                    protected int computeJmxPort(int nodeNum)
                     {
-                        if (portMap != null)
-                        {
-                            return portMap.computeIfAbsent("storagePort@node" 
+ nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7011 + nodeNum));
-                        }
-                        return 7011 + nodeNum;
+                        return 7199 + nodeNum;
                     }
 
                     @Override
-                    public int nativeTransportPort(int nodeNum)
+                    public int seedNodeNum()
                     {
-                        if (portMap != null)
-                        {
-                            return 
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key -> 
SocketUtils.findAvailablePort(seedIp(), 9041 + nodeNum));
-                        }
-                        return 9041 + nodeNum;
+                        return 1;
                     }
 
                     @Override
-                    public int jmxPort(int nodeNum)
+                    public String ipAddress(int nodeNum)
                     {
-                        if (portMap != null)
-                        {
-                            return portMap.computeIfAbsent("jmxPort@node" + 
nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7199 + nodeNum));
-                        }
-                        return 7199 + nodeNum;
+                        return ipAdress;
                     }
                 };
             }
@@ -92,22 +81,15 @@ public interface INodeProvisionStrategy
         MultipleNetworkInterfaces
         {
             @Override
-            INodeProvisionStrategy create(int subnet, @Nullable Map<String, 
Integer> portMap)
+            public INodeProvisionStrategy create(int subnet, @Nullable 
Map<String, Integer> portMap)
             {
                 String ipPrefix = "127.0." + subnet + '.';
-                return new INodeProvisionStrategy()
+                return new AbstractNodeProvisionStrategy(portMap)
                 {
-
                     @Override
-                    public String seedIp()
+                    public int seedNodeNum()
                     {
-                        return ipPrefix + '1';
-                    }
-
-                    @Override
-                    public int seedPort()
-                    {
-                        return storagePort(1);
+                        return 1;
                     }
 
                     @Override
@@ -115,51 +97,12 @@ public interface INodeProvisionStrategy
                     {
                         return ipPrefix + nodeNum;
                     }
-
-                    @Override
-                    public int storagePort(int nodeNum)
-                    {
-                        if (portMap != null)
-                        {
-                            return portMap.computeIfAbsent("storagePort@node" 
+ nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7012));
-                        }
-                        return 7012;
-                    }
-
-                    @Override
-                    public int nativeTransportPort(int nodeNum)
-                    {
-                        if (portMap != null)
-                        {
-                            return 
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key -> 
SocketUtils.findAvailablePort(ipAddress(nodeNum), 9042));
-                        }
-                        return 9042;
-                    }
-
-                    @Override
-                    public int jmxPort(int nodeNum)
-                    {
-                        if (portMap != null)
-                        {
-                            return portMap.computeIfAbsent("jmxPort@node" + 
nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7199));
-                        }
-                        return 7199;
-                    }
                 };
             }
         };
-
-        INodeProvisionStrategy create(int subnet)
-        {
-            return create(subnet, null);
-        }
-
-        abstract INodeProvisionStrategy create(int subnet, @Nullable 
Map<String, Integer> portMap);
     }
 
-    String seedIp();
-
-    int seedPort();
+    int seedNodeNum();
 
     String ipAddress(int nodeNum);
 
@@ -168,4 +111,60 @@ public interface INodeProvisionStrategy
     int nativeTransportPort(int nodeNum);
 
     int jmxPort(int nodeNum);
+
+    abstract class AbstractNodeProvisionStrategy implements 
INodeProvisionStrategy
+    {
+        @Nullable
+        private final Map<String, Integer> portMap;
+
+        protected AbstractNodeProvisionStrategy(@Nullable Map<String, Integer> 
portMap)
+        {
+            this.portMap = portMap;
+        }
+
+        protected int computeStoragePort(int nodeNum)
+        {
+            return 7012;
+        }
+
+        protected int computeNativeTransportPort(int nodeNum)
+        {
+            return 9042;
+        }
+
+        protected int computeJmxPort(int nodeNum)
+        {
+            return 7199;
+        }
+
+        @Override
+        public int storagePort(int nodeNum)
+        {
+            if (portMap != null)
+            {
+                return portMap.computeIfAbsent("storagePort@node" + nodeNum, 
key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 
computeStoragePort(nodeNum)));
+            }
+            return computeStoragePort(nodeNum);
+        }
+
+        @Override
+        public int nativeTransportPort(int nodeNum)
+        {
+            if (portMap != null)
+            {
+                return portMap.computeIfAbsent("nativeTransportPort@node" + 
nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 
computeNativeTransportPort(nodeNum)));
+            }
+            return computeNativeTransportPort(nodeNum);
+        }
+
+        @Override
+        public int jmxPort(int nodeNum)
+        {
+            if (portMap != null)
+            {
+                return portMap.computeIfAbsent("jmxPort@node" + nodeNum, key 
-> SocketUtils.findAvailablePort(ipAddress(nodeNum), computeJmxPort(nodeNum)));
+            }
+            return computeJmxPort(nodeNum);
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 603b10ca7e..cc66a59732 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -305,14 +305,15 @@ public class InstanceConfig implements IInstanceConfig
                                           Collection<String> tokens,
                                           int datadirCount)
     {
+        int seedNode = provisionStrategy.seedNodeNum();
         return new InstanceConfig(nodeNum,
                                   networkTopology,
                                   provisionStrategy.ipAddress(nodeNum),
                                   provisionStrategy.ipAddress(nodeNum),
                                   provisionStrategy.ipAddress(nodeNum),
                                   provisionStrategy.ipAddress(nodeNum),
-                                  provisionStrategy.seedIp(),
-                                  provisionStrategy.seedPort(),
+                                  provisionStrategy.ipAddress(seedNode),
+                                  provisionStrategy.storagePort(seedNode),
                                   String.format("%s/node%d/saved_caches", 
root, nodeNum),
                                   datadirs(datadirCount, root, nodeNum),
                                   String.format("%s/node%d/commitlog", root, 
nodeNum),
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index f6628d3047..f37d634a5d 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -303,7 +303,9 @@ public class ClusterUtils
             properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, 
TimeUnit.SECONDS.toMillis(10));
 
             // state which node to replace
-            properties.set(REPLACE_ADDRESS_FIRST_BOOT, 
toReplace.config().broadcastAddress().getAddress().getHostAddress());
+            InetSocketAddress address = toReplace.config().broadcastAddress();
+            // when port isn't defined we use the default port, but in 
jvm-dtest the port might change!
+            properties.set(REPLACE_ADDRESS_FIRST_BOOT, 
address.getAddress().getHostAddress() + ":" + address.getPort());
 
             fn.accept(inst, properties);
         });
@@ -572,13 +574,11 @@ public class ClusterUtils
     {
         public final int node;
         public final Epoch epoch;
-        public final Epoch replicated;
 
-        private ClusterMetadataVersion(int node, Epoch epoch, Epoch replicated)
+        private ClusterMetadataVersion(int node, Epoch epoch)
         {
             this.node = node;
             this.epoch = epoch;
-            this.replicated = replicated;
         }
 
         public String toString()
@@ -586,11 +586,32 @@ public class ClusterUtils
             return "Version{" +
                    "node=" + node +
                    ", epoch=" + epoch +
-                   ", replicated=" + replicated +
                    '}';
         }
     }
 
+    public static void waitForCMSToQuiesce(ICluster<IInvokableInstance> 
cluster, int[] cmsNodes)
+    {
+        // first step; find the largest epoch
+        waitForCMSToQuiesce(cluster, maxEpoch(cluster, cmsNodes));
+    }
+
+    private static Epoch maxEpoch(ICluster<IInvokableInstance> cluster, int[] 
cmsNodes)
+    {
+        Epoch max = null;
+        for (int id : cmsNodes)
+        {
+            IInvokableInstance inst = cluster.get(id);
+            if (inst.isShutdown()) continue;
+            Epoch version = getClusterMetadataVersion(inst);
+            if (max == null || version.getEpoch() > max.getEpoch())
+                max = version;
+        }
+        if (max == null)
+            throw new AssertionError("Unable to find max epoch from " + 
cmsNodes);
+        return max;
+    }
+
     public static void waitForCMSToQuiesce(ICluster<IInvokableInstance> 
cluster, Epoch awaitedEpoch, int...ignored)
     {
         List<ClusterMetadataVersion> notMatching = new ArrayList<>();
@@ -611,8 +632,8 @@ public class ClusterUtils
                 if (cluster.get(j).isShutdown())
                     continue;
                 Epoch version = getClusterMetadataVersion(cluster.get(j));
-                if (!awaitedEpoch.equals(version))
-                    notMatching.add(new ClusterMetadataVersion(j, version, 
getClusterMetadataVersion(cluster.get(j))));
+                if (version.getEpoch() < awaitedEpoch.getEpoch())
+                    notMatching.add(new ClusterMetadataVersion(j, version));
             }
             if (notMatching.isEmpty())
                 return;
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
new file mode 100644
index 0000000000..11fce664a1
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.utils.Gen;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.PreCheckResult;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.RandomSource;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.harry.HarryHelper;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJvmSut;
+
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+
+public class HarryTopologyMixupTest extends 
TopologyMixupTestBase<HarryTopologyMixupTest.Spec>
+{
+    @Override
+    protected Gen<State<Spec>> stateGen()
+    {
+        return HarryState::new;
+    }
+
+    @Override
+    protected void preCheck(Property.StatefulBuilder builder)
+    {
+        // if a failing seed is detected, populate here
+        // Example: builder.withSeed(42L);
+    }
+
+    @Override
+    protected void destroyState(State<Spec> state, @Nullable Throwable cause)
+    {
+        if (cause != null) return;
+        if (((HarryState) state).numInserts > 0)
+        {
+            // do one last read just to make sure we validate the data...
+            var harry = state.schemaSpec.harry;
+            harry.validateAll(harry.quiescentLocalChecker());
+        }
+    }
+
+    private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+    {
+        ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
+                                                            new 
InJvmSut(cluster),
+                                                            new 
TokenPlacementModel.SimpleReplicationFactor(3),
+                                                            
SystemUnderTest.ConsistencyLevel.ALL);
+        cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
+        var schema = harry.schema();
+        cluster.schemaChange(schema.compile().cql());
+        waitForCMSToQuiesce(cluster, cluster.get(1));
+        return new Spec(harry);
+    }
+
+    private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>, 
Void, ?>> cqlOperations(Spec spec)
+    {
+        class HarryCommand extends SimpleCommand<State<Spec>>
+        {
+            HarryCommand(Function<State<Spec>, String> name, 
Consumer<State<Spec>> fn)
+            {
+                super(name, fn);
+            }
+
+            @Override
+            public PreCheckResult checkPreconditions(State<Spec> state)
+            {
+                int clusterSize = state.topologyHistory.up().length;
+                return clusterSize >= 3 ? PreCheckResult.Ok : 
PreCheckResult.Ignore;
+            }
+        }
+        Command<State<Spec>, Void, ?> insert = new HarryCommand(state -> 
"Harry Insert" + state.commandNamePostfix(), state -> {
+            spec.harry.insert();
+            ((HarryState) state).numInserts++;
+        });
+        Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state -> 
"Harry Validate All" + state.commandNamePostfix(), state -> {
+            spec.harry.validateAll(spec.harry.quiescentLocalChecker());
+            ((HarryState) state).numInserts = 0;
+        });
+        return (rs, state) -> {
+            HarryState harryState = (HarryState) state;
+            TopologyHistory history = state.topologyHistory;
+            // if any topology change happened, then always validate all
+            if (harryState.generation != history.generation())
+            {
+                harryState.generation = history.generation();
+                return validateAll;
+            }
+            if ((harryState.numInserts > 0 && rs.decide(0.2))) // 20% of the 
time do reads
+                return validateAll;
+            return insert;
+        };
+    }
+
+    public static class Spec implements TopologyMixupTestBase.SchemaSpec
+    {
+        private final ReplayingHistoryBuilder harry;
+
+        public Spec(ReplayingHistoryBuilder harry)
+        {
+            this.harry = harry;
+        }
+
+        @Override
+        public String name()
+        {
+            return harry.schema().table;
+        }
+
+        @Override
+        public String keyspaceName()
+        {
+            return HarryHelper.KEYSPACE;
+        }
+    }
+
+    public static class HarryState extends State<Spec>
+    {
+        private long generation;
+        private int numInserts = 0;
+        public HarryState(RandomSource rs)
+        {
+            super(rs, HarryTopologyMixupTest::createSchemaSpec, 
HarryTopologyMixupTest::cqlOperations);
+        }
+
+        @Override
+        protected void onConfigure(IInstanceConfig config)
+        {
+            config.set("metadata_snapshot_frequency", 5);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
new file mode 100644
index 0000000000..8e8c57b132
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.commands;
+import static accord.utils.Property.ignoreCommand;
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the 
following to avoid address bind failures
+ * <p>
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id"; 
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    private enum TopologyChange
+    {
+        AddNode,
+        RemoveNode,
+        HostReplace,
+        //TODO (coverage): add the following states once supported
+//        StopNode,
+//        StartNode,
+//        MoveToken
+        //TODO (coverage): node migrate to another rack or dc (unsupported on 
trunk as of this writing, but planned work for TCM)
+//        MoveNodeToNewRack,
+//        MoveNodeToNewDC,
+    }
+
+    private enum RemoveType
+    {Decommission, RemoveNode, Assassinate}
+
+    private static final Gen.IntGen MURMUR_TOKEN_GEN = rs -> 
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+    private static final int TARGET_RF = 3;
+    private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION = 
Gens.enums().allMixedDistribution(RemoveType.class);
+    private static final Gen<Map<String, Object>> CONF_GEN = new 
ConfigGenBuilder()
+                                                             // jvm-dtest hard 
codes this partitioner in its APIs, so overriding will break the test
+                                                             
.withPartitionerGen(null)
+                                                             .build();
+
+    // common commands
+    private Command<State<S>, Void, ?> repairCommand(int toCoordinate)
+    {
+        return new SimpleCommand<>(state -> "nodetool repair " + 
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node" 
+ toCoordinate + state.commandNamePostfix(),
+                                   state -> 
state.cluster.get(toCoordinate).nodetoolResult("repair", 
state.schemaSpec.keyspaceName(), state.schemaSpec.name()).asserts().success());
+    }
+
+    private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+    {
+        return new SimpleCommand<>(state -> "Waiting for CMS to Quiesce" + 
state.commandNamePostfix(),
+                                   state -> 
ClusterUtils.waitForCMSToQuiesce(state.cluster, state.cmsGroup));
+    }
+
+    private Command<State<S>, Void, ?> stopInstance(int toRemove)
+    {
+        return new SimpleCommand<>(state -> "Stop Node" + toRemove + " for 
Assassinate" + state.commandNamePostfix(),
+                                   state -> {
+                                       IInvokableInstance inst = 
state.cluster.get(toRemove);
+                                       TopologyHistory.Node node = 
state.topologyHistory.node(toRemove);
+                                       ClusterUtils.stopUnchecked(inst);
+                                       node.down();
+                                   });
+    }
+
+    private Command<State<S>, Void, ?> addNode()
+    {
+        return new SimpleCommand<>(state -> "Add Node" + 
(state.topologyHistory.uniqueInstances + 1) + state.commandNamePostfix(),
+                                   state -> {
+                                       TopologyHistory.Node n = 
state.topologyHistory.addNode();
+                                       IInvokableInstance newInstance = 
ClusterUtils.addInstance(state.cluster, n.dc, n.rack, c -> 
c.set("auto_bootstrap", true));
+                                       newInstance.startup(state.cluster);
+                                       n.up();
+                                   });
+    }
+
+    private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs, 
State<S> state)
+    {
+        int toRemove = rs.pickInt(state.topologyHistory.up());
+        return new SimpleCommand<>("nodetool decommission node" + toRemove + 
state.commandNamePostfix(), s2 -> {
+            IInvokableInstance inst = s2.cluster.get(toRemove);
+            TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+            node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+            inst.nodetoolResult("decommission").asserts().success();
+            ClusterUtils.stopUnchecked(inst);
+            node.removed();
+        });
+    }
+
+    private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S> 
state)
+    {
+        int[] up = state.topologyHistory.up();
+        int toRemove = rs.pickInt(up);
+        int toCoordinate;
+        {
+            int picked;
+            do
+            {
+                picked = rs.pickInt(up);
+            }
+            while (picked == toRemove);
+            toCoordinate = picked;
+        }
+        return multistep(stopInstance(toRemove),
+                         new SimpleCommand<>("nodetool removenode node" + 
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+                             TopologyHistory.Node node = 
s2.topologyHistory.node(toRemove);
+                             node.status = 
TopologyHistory.Node.Status.BeingRemoved;
+                             IInvokableInstance coordinator = 
s2.cluster.get(toCoordinate);
+                             coordinator.nodetoolResult("removenode", 
Integer.toString(toRemove), "--force").asserts().success();
+                             node.removed();
+                             
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+                         }),
+                         repairCommand(toCoordinate));
+    }
+
+    private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs, 
State<S> state)
+    {
+        //TODO (correctness): assassinate CMS member isn't allowed
+        IntHashSet up = asSet(state.topologyHistory.up());
+        IntHashSet cmsGroup = asSet(state.cmsGroup);
+        Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+        if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a 
CMS member");
+        List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+        allowed.sort(Comparator.naturalOrder());
+        int toRemove = rs.pick(allowed);
+        int toCoordinate;
+        {
+            int[] upInt = state.topologyHistory.up();
+            int picked;
+            do
+            {
+                picked = rs.pickInt(upInt);
+            }
+            while (picked == toRemove);
+            toCoordinate = picked;
+        }
+        return multistep(stopInstance(toRemove),
+                         new SimpleCommand<>("nodetool assassinate node" + 
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+                             TopologyHistory.Node node = 
s2.topologyHistory.node(toRemove);
+                             node.status = 
TopologyHistory.Node.Status.BeingAssassinated;
+                             IInvokableInstance coordinator = 
s2.cluster.get(toCoordinate);
+                             InetSocketAddress address = 
s2.cluster.get(toRemove).config().broadcastAddress();
+                             coordinator.nodetoolResult("assassinate", 
address.getAddress().getHostAddress() + ":" + 
address.getPort()).asserts().success();
+                             node.removed();
+                             
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+                         }),
+                         repairCommand(toCoordinate)
+        );
+    }
+
+    private Command<State<S>, Void, ?> 
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+    {
+        RemoveType type = state.removeTypeGen.next(rs);
+        switch (type)
+        {
+            case Decommission:
+                return removeNodeDecommission(rs, state);
+            case RemoveNode:
+                return removeNode(rs, state);
+            case Assassinate:
+                return removeNodeAssassinate(rs, state);
+            default:
+                throw new UnsupportedOperationException("Unknown remove type: 
" + type);
+        }
+    }
+
+    private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S> 
state)
+    {
+        int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+        IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+        TopologyHistory.Node adding = 
state.topologyHistory.replace(nodeToReplace);
+        TopologyHistory.Node removing = 
state.topologyHistory.nodes.get(nodeToReplace);
+
+        return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + " 
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+                             ClusterUtils.stopUnchecked(toReplace);
+                             removing.down();
+                         }),
+                         new SimpleCommand<>("Host Replace Node" + 
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+                             logger.info("node{} starting host replacement; 
epoch={}", adding.id, 
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+                             removing.status = 
TopologyHistory.Node.Status.BeingReplaced;
+                             IInvokableInstance inst = 
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+                             s2.topologyHistory.replaced(removing, adding);
+                             long epoch = HackSerialization.tcmEpoch(inst);
+                             s2.currentEpoch.set(epoch);
+                             logger.info("{} completed host replacement in 
epoch={}", inst, epoch);
+                         }),
+                         //TODO (remove after rebase to trunk): 
https://issues.apache.org/jira/browse/CASSANDRA-19705  After the rebase to 
trunk this is not needed.  The issue is that the CMS placement removes the 
node, it does not promote another node, this cases rf=3 to become rf=2
+                         new SimpleCommand<>("CMS reconfigure on Node" + 
adding.id + state.commandNamePostfix(), s2 -> 
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure", 
Integer.toString(TARGET_RF)).asserts().success())
+        );
+    }
+
+    protected abstract Gen<State<S>> stateGen();
+
+    protected void preCheck(Property.StatefulBuilder statefulBuilder)
+    {
+
+    }
+
+    protected void destroyState(State<S> state, @Nullable Throwable cause) 
throws Throwable
+    {
+
+    }
+
+    @Test
+    public void test()
+    {
+        Property.StatefulBuilder statefulBuilder = 
stateful().withSteps(20).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+        preCheck(statefulBuilder);
+        statefulBuilder.check(commands(this::stateGen)
+                              .preCommands(state -> 
state.preActions.forEach(Runnable::run))
+                              .add(2, (rs, state) -> {
+                                  EnumSet<TopologyChange> 
possibleTopologyChanges = possibleTopologyChanges(state);
+                                  if (possibleTopologyChanges.isEmpty()) 
return ignoreCommand();
+                                  return topologyCommand(state, 
possibleTopologyChanges).next(rs);
+                              })
+                              .add(1, (rs, state) -> 
repairCommand(rs.pickInt(state.topologyHistory.up())))
+                              .add(7, (rs, state) -> 
state.statementGen.apply(rs, state))
+                              .destroyState((state, cause) -> {
+                                  try (state)
+                                  {
+                                      
TopologyMixupTestBase.this.destroyState(state, cause);
+                                  }
+                              })
+                              .build());
+    }
+
+    private EnumSet<TopologyChange> possibleTopologyChanges(State<S> state)
+    {
+        EnumSet<TopologyChange> possibleTopologyChanges = 
EnumSet.noneOf(TopologyChange.class);
+        // up or down is logically more correct, but since this runs 
sequentially and after the topology changes are complete, we don't have downed 
nodes at this point
+        // so up is enough to know the topology size
+        int size = state.topologyHistory.up().length;
+        if (size < state.topologyHistory.maxNodes)
+            possibleTopologyChanges.add(TopologyChange.AddNode);
+        if (size > state.topologyHistory.quorum())
+        {
+            if (size > TARGET_RF)
+                possibleTopologyChanges.add(TopologyChange.RemoveNode);
+            possibleTopologyChanges.add(TopologyChange.HostReplace);
+        }
+        return possibleTopologyChanges;
+    }
+
+    private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S> state, 
EnumSet<TopologyChange> possibleTopologyChanges)
+    {
+        Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new 
LinkedHashMap<>();
+        for (TopologyChange task : possibleTopologyChanges)
+        {
+            switch (task)
+            {
+                case AddNode:
+                    possible.put(ignore -> multistep(addNode(), 
waitForCMSToQuiesce()), 1);
+                    break;
+                case RemoveNode:
+                    possible.put(rs -> 
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+                    break;
+                case HostReplace:
+                    possible.put(rs -> multistep(hostReplace(rs, state), 
waitForCMSToQuiesce()), 1);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(task.name());
+            }
+        }
+        return Gens.oneOf(possible);
+    }
+
+    private static IntHashSet asSet(int[] array)
+    {
+        IntHashSet set = new IntHashSet(array.length);
+        for (int i : array)
+            set.add(i);
+        return set;
+    }
+
+    public interface SchemaSpec
+    {
+        String name();
+
+        String keyspaceName();
+    }
+
+    protected static class State<S extends SchemaSpec> implements AutoCloseable
+    {
+        final TopologyHistory topologyHistory;
+        final Cluster cluster;
+        final S schemaSpec;
+        final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+        final AtomicLong currentEpoch = new AtomicLong();
+        final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>> 
statementGen;
+        final Gen<RemoveType> removeTypeGen;
+        private final Map<String, Object> yamlConfigOverrides;
+        int[] cmsGroup = new int[0];
+
+        public State(RandomSource rs, BiFunction<RandomSource, Cluster, S> 
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>, 
Void, ?>>> cqlOperationsGen)
+        {
+            this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+            try
+            {
+
+                this.yamlConfigOverrides = CONF_GEN.next(rs);
+                cluster = Cluster.build(topologyHistory.minNodes)
+                                 .withTokenSupplier(topologyHistory)
+                                 .withConfig(c -> {
+                                     c.with(Feature.values())
+                                      .set("write_request_timeout", "10s");
+                                     //TODO (maintenance): where to put this?  
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+                                     ((InstanceConfig) 
c).remove("commitlog_sync_period_in_ms");
+                                     for (Map.Entry<String, Object> e : 
yamlConfigOverrides.entrySet())
+                                         c.set(e.getKey(), e.getValue());
+                                     onConfigure(c);
+                                 })
+                                 //TODO (maintenance): should TopologyHistory 
also be a INodeProvisionStrategy.Factory so address information is stored in 
the Node?
+                                 //TODO (maintenance): AbstractCluster's 
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with 
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+                                 .withNodeProvisionStrategy((subnet, portMap) 
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+                                 {
+                                     {
+                                         Invariants.checkArgument(subnet == 0, 
"Unexpected subnet detected: %d", subnet);
+                                     }
+
+                                     private final String ipPrefix = "127.0." 
+ subnet + '.';
+
+                                     @Override
+                                     public int seedNodeNum()
+                                     {
+                                         int[] up = topologyHistory.up();
+                                         if (Arrays.equals(up, new int[]{ 1, 2 
}))
+                                             return 1;
+                                         return rs.pickInt(up);
+                                     }
+
+                                     @Override
+                                     public String ipAddress(int nodeNum)
+                                     {
+                                         return ipPrefix + nodeNum;
+                                     }
+                                 })
+                                 .start();
+            }
+            catch (IOException e)
+            {
+                throw new UncheckedIOException(e);
+            }
+            fixDistributedSchemas(cluster);
+            init(cluster, TARGET_RF);
+            // fix TCM
+            {
+                NodeToolResult result = cluster.get(1).nodetoolResult("cms", 
"reconfigure", "2");
+                result.asserts().success();
+                logger.info("CMS reconfigure: {}", result.getStdout());
+            }
+            preActions.add(new Runnable()
+            {
+                // in order to remove this action, an anonymous class is 
needed so "this" works, lambda "this" is the parent class
+                @Override
+                public void run()
+                {
+                    if (topologyHistory.up().length == TARGET_RF)
+                    {
+                        NodeToolResult result = 
cluster.get(1).nodetoolResult("cms", "reconfigure", 
Integer.toString(TARGET_RF));
+                        result.asserts().success();
+                        logger.info("CMS reconfigure: {}", result.getStdout());
+                        preActions.remove(this);
+                    }
+                }
+            });
+            preActions.add(() -> {
+                int[] up = topologyHistory.up();
+                // use the most recent node just in case the cluster isn't 
in-sync
+                IInvokableInstance node = cluster.get(up[up.length - 1]);
+                cmsGroup = HackSerialization.cmsGroup(node);
+                currentEpoch.set(HackSerialization.tcmEpoch(node));
+            });
+            preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
+            this.schemaSpec = schemaSpecGen.apply(rs, cluster);
+            statementGen = cqlOperationsGen.apply(schemaSpec);
+
+            removeTypeGen = REMOVE_TYPE_DISTRIBUTION.next(rs);
+
+            long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1));
+            currentEpoch.set(waitForEpoch);
+            onStartupComplete(waitForEpoch);
+        }
+
+        protected void onStartupComplete(long tcmEpoch)
+        {
+
+        }
+
+        protected void onConfigure(IInstanceConfig config)
+        {
+
+        }
+
+        protected String commandNamePostfix()
+        {
+            return "; epoch=" + currentEpoch.get() + ", cms=" + 
Arrays.toString(cmsGroup);
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Yaml 
Config:\n").append(YamlConfigurationLoader.toYaml(this.yamlConfigOverrides));
+            sb.append("\nTopology:\n").append(topologyHistory);
+            sb.append("\nCMS Voting Group: 
").append(Arrays.toString(cmsGroup));
+            return sb.toString();
+        }
+
+        @Override
+        public void close() throws Exception
+        {
+            cluster.close();
+        }
+    }
+
+    public static class TopologyHistory implements TokenSupplier
+    {
+        private final RandomSource rs;
+        private final int tokensPerNode;
+        private final int minNodes, maxNodes;
+
+        private final Int2ObjectHashMap<Node> nodes = new 
Int2ObjectHashMap<>();
+        private final Set<String> activeTokens = new HashSet<>();
+        private int uniqueInstances = 0;
+        /**
+         * Tracks how many topology change events were performed
+         */
+        private int generation = 0;
+
+        public TopologyHistory(RandomSource rs, int minNodes, int maxNodes)
+        {
+            this.rs = rs;
+            this.minNodes = minNodes;
+            this.maxNodes = maxNodes;
+            this.tokensPerNode = Cluster.build(1).getTokenCount();
+            for (int i = 0; i < minNodes; i++)
+                addNode();
+            for (Node n : nodes.values())
+                n.status = Node.Status.Up;
+        }
+
+        public long generation()
+        {
+            return generation;
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 1; i <= nodes.size(); i++)
+            {
+                Node node = nodes.get(i);
+                sb.append("\n\tNode").append(i).append(": 
status=").append(node.status).append(", tokens=").append(node.tokens);
+            }
+            return sb.toString();
+        }
+
+        public int quorum()
+        {
+            return (TARGET_RF / 2) + 1;
+        }
+
+        @Override
+        public Collection<String> tokens(int i)
+        {
+            Node n = nodes.get(i);
+            if (n == null)
+                throw new IllegalArgumentException("Unknown node" + i);
+            return n.tokens;
+        }
+
+        public int[] up()
+        {
+            IntArrayList up = new IntArrayList(nodes.size(), -1);
+            for (Map.Entry<Integer, Node> n : nodes.entrySet())
+            {
+                if (n.getValue().status == Node.Status.Up)
+                    up.add(n.getKey());
+            }
+            int[] ints = up.toIntArray();
+            Arrays.sort(ints);
+            return ints;
+        }
+
+        public int size()
+        {
+            return nodes.size();
+        }
+
+        public Node addNode()
+        {
+            int id = ++uniqueInstances;
+            List<String> instTokens = Gens.lists(MURMUR_TOKEN_GEN
+                                                 .filterAsInt(t -> 
!activeTokens.contains(Integer.toString(t))))
+                                          .unique()
+                                          .ofSize(tokensPerNode)
+                                          .next(rs).stream()
+                                          .map(Object::toString)
+                                          .collect(Collectors.toList());
+            activeTokens.addAll(instTokens);
+            Node node = new Node(this, id, instTokens, "datacenter0", "rack0");
+            node.status = Node.Status.Down;
+            nodes.put(id, node);
+            return node;
+        }
+
+        public Node replace(int toReplace)
+        {
+            int id = ++uniqueInstances;
+            Node replacing = Objects.requireNonNull(nodes.get(toReplace));
+            Node node = new Node(this, id, replacing.tokens, replacing.dc, 
replacing.rack);
+            node.replacing = node.id;
+            nodes.put(id, node);
+            return node;
+        }
+
+        public void replaced(Node removing, Node adding)
+        {
+            adding.status = TopologyHistory.Node.Status.Up;
+            removing.status = TopologyHistory.Node.Status.Removed;
+            adding.replacing = null;
+            generation++;
+        }
+
+        public Node node(int id)
+        {
+            if (!nodes.containsKey(id)) throw new 
NoSuchElementException("Unknown node" + id);
+            return nodes.get(id);
+        }
+
+        private static class Node
+        {
+            enum Status
+            {Up, Down, BeingReplaced, BeingDecommissioned, BeingRemoved, 
BeingAssassinated, Removed}
+
+            final TopologyHistory parent;
+            final int id;
+            final List<String> tokens;
+            final String dc, rack;
+            Status status = Status.Down;
+            Integer replacing = null;
+
+            private Node(TopologyHistory parent, int id, List<String> tokens, 
String dc, String rack)
+            {
+                this.parent = parent;
+                this.id = id;
+                this.tokens = tokens;
+                this.dc = dc;
+                this.rack = rack;
+            }
+
+            public void up()
+            {
+                status = TopologyHistory.Node.Status.Up;
+                parent.generation++;
+            }
+
+            public void down()
+            {
+                status = TopologyHistory.Node.Status.Down;
+                parent.generation++;
+            }
+
+            public void removed()
+            {
+                status = Status.Removed;
+                parent.activeTokens.removeAll(tokens);
+                parent.generation++;
+            }
+
+            @Override
+            public String toString()
+            {
+                return "Node{" +
+                       "status=" + status +
+                       (replacing == null ? "" : (", replacing=" + replacing)) 
+
+                       ", tokens=" + tokens +
+                       '}';
+            }
+        }
+    }
+
+    public static class HackSerialization
+    {
+        private static long tcmEpoch(IInvokableInstance inst)
+        {
+            return inst.callOnInstance(() -> 
ClusterMetadata.current().epoch.getEpoch());
+        }
+
+        private static long tcmEpochAndSync(IInvokableInstance inst)
+        {
+            return inst.callOnInstance(() -> 
ClusterMetadataService.instance().log().waitForHighestConsecutive().epoch.getEpoch());
+        }
+
+        public static int[] cmsGroup(IInvokableInstance inst)
+        {
+            return inst.callOnInstance(() -> {
+                ClusterMetadata current = ClusterMetadata.current();
+                Set<InetAddressAndPort> members = 
current.placements.get(ReplicationParams.meta(current)).writes.byEndpoint().keySet();
+                // Why not just use 'current.fullCMSMembers()'?  That uses the 
"read" replicas, so "could" have less endpoints
+                // It would be more consistent to use fullCMSMembers but 
thought process is knowing the full set is better
+                // than the coordination set.
+                int[] array = 
members.stream().mapToInt(HackSerialization::addressToNodeId).toArray();
+                Arrays.sort(array);
+                return array;
+            });
+        }
+
+        private static int addressToNodeId(InetAddressAndPort addressAndPort)
+        {
+            String address = addressAndPort.getAddress().getHostAddress();
+            String[] parts = address.split("\\.");
+            Invariants.checkState(parts.length == 4, "Unable to parse address 
%s", address);
+            return Integer.parseInt(parts[3]);
+        }
+    }
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/HarryHelper.java 
b/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
index 09cc574082..9dc0fcfaf4 100644
--- a/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
@@ -131,7 +131,11 @@ public class HarryHelper
 
     public static ReplayingHistoryBuilder dataGen(SystemUnderTest sut, 
TokenPlacementModel.ReplicationFactor rf, SystemUnderTest.ConsistencyLevel 
writeCl)
     {
-        long seed = 1L;
+        return dataGen(1, sut, rf, writeCl);
+    }
+
+    public static ReplayingHistoryBuilder dataGen(long seed, SystemUnderTest 
sut, TokenPlacementModel.ReplicationFactor rf, SystemUnderTest.ConsistencyLevel 
writeCl)
+    {
         SchemaSpec schema = schemaSpecGen("harry", "tbl_").inflate(seed);
         return new ReplayingHistoryBuilder(seed, 100, 1, new 
DefaultDataTracker(), sut, schema, rf, writeCl);
     }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
index 5943b77b65..5db0ee795d 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.harry.core.Run;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.sut.SystemUnderTest;
@@ -118,8 +119,9 @@ public class InJVMTokenAwareVisitExecutor extends 
LoggingVisitor.LoggingVisitorE
 
     protected TokenPlacementModel.ReplicatedRanges getRing()
     {
-        List<TokenPlacementModel.Node> other = 
peerStateToNodes(sut.cluster.coordinator(1).execute("select peer, tokens, 
data_center, rack from system.peers", ConsistencyLevel.ONE));
-        List<TokenPlacementModel.Node> self = 
peerStateToNodes(sut.cluster.coordinator(1).execute("select broadcast_address, 
tokens, data_center, rack from system.local", ConsistencyLevel.ONE));
+        ICoordinator coordinator = sut.firstAlive().coordinator();
+        List<TokenPlacementModel.Node> other = 
peerStateToNodes(coordinator.execute("select peer, tokens, data_center, rack 
from system.peers", ConsistencyLevel.ONE));
+        List<TokenPlacementModel.Node> self = 
peerStateToNodes(coordinator.execute("select broadcast_address, tokens, 
data_center, rack from system.local", ConsistencyLevel.ONE));
         List<TokenPlacementModel.Node> all = new ArrayList<>();
         all.addAll(self);
         all.addAll(other);
diff --git 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
index db119acc02..4c19e67294 100644
--- a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
+++ b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
@@ -79,7 +79,13 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER 
extends ICluster<NODE>
 
             public Integer get()
             {
-                return (int) (cnt.getAndIncrement() % cluster.size() + 1);
+                for (int i = 0; i < 42; i++)
+                {
+                    int selected = (int) (cnt.getAndIncrement() % 
cluster.size() + 1);
+                    if (!cluster.get(selected).isShutdown())
+                        return selected;
+                }
+                throw new IllegalStateException("Unable to find an alive 
instance");
             }
         };
     }
@@ -129,6 +135,11 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER 
extends ICluster<NODE>
         cluster.schemaChange(statement);
     }
 
+    public IInstance firstAlive()
+    {
+        return cluster.stream().filter(i -> !i.isShutdown()).findFirst().get();
+    }
+
     public Object[][] execute(String statement, ConsistencyLevel cl, int 
pageSize, Object... bindings)
     {
         return execute(statement, cl, loadBalancingStrategy.get(), pageSize, 
bindings);
diff --git 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
index c8d24f28a1..f9ee0d0125 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.harry.sut.injvm;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.harry.core.Run;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
 import org.apache.cassandra.harry.model.OpSelectors;
@@ -60,8 +61,10 @@ public class QuiescentLocalStateChecker extends 
QuiescentLocalStateCheckerBase
     @Override
     protected TokenPlacementModel.ReplicatedRanges getRing()
     {
-        List<TokenPlacementModel.Node> other = 
TokenPlacementModel.peerStateToNodes(((InJvmSutBase<?, ?>) 
sut).cluster.coordinator(1).execute("select peer, tokens, data_center, rack 
from system.peers", ConsistencyLevel.ONE));
-        List<TokenPlacementModel.Node> self = 
TokenPlacementModel.peerStateToNodes(((InJvmSutBase<?, ?>) 
sut).cluster.coordinator(1).execute("select broadcast_address, tokens, 
data_center, rack from system.local", ConsistencyLevel.ONE));
+        IInstance node = ((InJvmSutBase<?, ?>) sut).firstAlive();
+        ICoordinator coordinator = node.coordinator();
+        List<TokenPlacementModel.Node> other = 
TokenPlacementModel.peerStateToNodes(coordinator.execute("select peer, tokens, 
data_center, rack from system.peers", ConsistencyLevel.ONE));
+        List<TokenPlacementModel.Node> self = 
TokenPlacementModel.peerStateToNodes(coordinator.execute("select 
broadcast_address, tokens, data_center, rack from system.local", 
ConsistencyLevel.ONE));
         List<TokenPlacementModel.Node> all = new ArrayList<>();
         all.addAll(self);
         all.addAll(other);
diff --git a/test/unit/accord/utils/Gen.java b/test/unit/accord/utils/Gen.java
index 7563e8c29b..523812ccf4 100644
--- a/test/unit/accord/utils/Gen.java
+++ b/test/unit/accord/utils/Gen.java
@@ -22,8 +22,10 @@ import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.IntPredicate;
 import java.util.function.IntSupplier;
+import java.util.function.IntUnaryOperator;
 import java.util.function.LongPredicate;
 import java.util.function.LongSupplier;
+import java.util.function.LongUnaryOperator;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.function.ToIntFunction;
@@ -87,6 +89,22 @@ public interface Gen<A> {
         };
     }
 
+    default Gen<A> filter(int maxAttempts, A defaultValue, Predicate<A> fn)
+    {
+        Invariants.checkArgument(maxAttempts > 0, "Max attempts must be 
positive; given %d", maxAttempts);
+        Gen<A> self = this;
+        return r -> {
+            for (int i = 0; i < maxAttempts; i++)
+            {
+                A v = self.next(r);
+                if (fn.test(v))
+                    return v;
+
+            }
+            return defaultValue;
+        };
+    }
+
     default Supplier<A> asSupplier(RandomSource rs)
     {
         return () -> next(rs);
@@ -97,6 +115,21 @@ public interface Gen<A> {
         return Stream.generate(() -> next(rs));
     }
 
+    interface Int2IntMapFunction
+    {
+        int applyAsInt(RandomSource rs, int value);
+    }
+
+    interface Int2LongMapFunction
+    {
+        long applyAsLong(RandomSource rs, int value);
+    }
+
+    interface Long2LongMapFunction
+    {
+        long applyAsLong(RandomSource rs, long value);
+    }
+
     interface IntGen extends Gen<Integer>
     {
         int nextInt(RandomSource random);
@@ -107,7 +140,22 @@ public interface Gen<A> {
             return nextInt(random);
         }
 
-        default Gen.IntGen filterInt(IntPredicate fn)
+        default IntGen mapAsInt(IntUnaryOperator fn)
+        {
+            return r -> fn.applyAsInt(nextInt(r));
+        }
+
+        default IntGen mapAsInt(Int2IntMapFunction fn)
+        {
+            return r -> fn.applyAsInt(r, nextInt(r));
+        }
+
+        default LongGen mapAsLong(Int2LongMapFunction fn)
+        {
+            return r -> fn.applyAsLong(r, nextInt(r));
+        }
+
+        default Gen.IntGen filterAsInt(IntPredicate fn)
         {
             return rs -> {
                 int value;
@@ -123,7 +171,7 @@ public interface Gen<A> {
         @Override
         default Gen.IntGen filter(Predicate<Integer> fn)
         {
-            return filterInt(i -> fn.test(i));
+            return filterAsInt(i -> fn.test(i));
         }
 
         default IntSupplier asIntSupplier(RandomSource rs)
@@ -147,7 +195,17 @@ public interface Gen<A> {
             return nextLong(random);
         }
 
-        default Gen.LongGen filterLong(LongPredicate fn)
+        default LongGen mapAsLong(LongUnaryOperator fn)
+        {
+            return r -> fn.applyAsLong(nextLong(r));
+        }
+
+        default LongGen mapAsLong(Long2LongMapFunction fn)
+        {
+            return r -> fn.applyAsLong(r, nextLong(r));
+        }
+
+        default Gen.LongGen filterAsLong(LongPredicate fn)
         {
             return rs -> {
                 long value;
@@ -163,7 +221,7 @@ public interface Gen<A> {
         @Override
         default Gen.LongGen filter(Predicate<Long> fn)
         {
-            return filterLong(i -> fn.test(i));
+            return filterAsLong(i -> fn.test(i));
         }
 
         default LongSupplier asLongSupplier(RandomSource rs)
diff --git a/test/unit/accord/utils/Gens.java b/test/unit/accord/utils/Gens.java
index 1fdff2258b..975aee9f4f 100644
--- a/test/unit/accord/utils/Gens.java
+++ b/test/unit/accord/utils/Gens.java
@@ -19,26 +19,39 @@
 package accord.utils;
 
 import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
 import java.util.Set;
+import java.util.function.BooleanSupplier;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
 
+import com.google.common.collect.Iterables;
 
 public class Gens {
     private Gens() {
     }
 
+    public static <T> Gen<T> flatten(Gen<Gen<T>> gen)
+    {
+        return rs -> gen.next(rs).next(rs);
+    }
+
     public static <T> Gen<T> constant(T constant)
     {
         return ignore -> constant;
@@ -49,6 +62,37 @@ public class Gens {
         return ignore -> constant.get();
     }
 
+    public static <T> Gen<T> oneOf(Gen<? extends T>... gens)
+    {
+        switch (gens.length)
+        {
+            case 0: throw new IllegalArgumentException("Unable to select oneOf 
an empty list");
+            case 1: return (Gen<T>) gens[0];
+        }
+        return oneOf(Arrays.asList(gens));
+    }
+
+    public static <T> Gen<T> oneOf(List<Gen<? extends T>> gens)
+    {
+        switch (gens.size())
+        {
+            case 0: throw new IllegalArgumentException("Unable to select oneOf 
an empty list");
+            case 1: return (Gen<T>) gens.get(0);
+        }
+        return rs -> rs.pick(gens).next(rs);
+    }
+
+    public static <T> Gen<T> oneOf(Map<Gen<T>, Integer> values)
+    {
+        Gen<Gen<T>> gen = pick(values);
+        return rs -> gen.next(rs).next(rs);
+    }
+
+    public static Gen.IntGen pickInt(int... ts)
+    {
+        return rs -> ts[rs.nextInt(0, ts.length)];
+    }
+
     public static <T> Gen<T> pick(T... ts)
     {
         return pick(Arrays.asList(ts));
@@ -74,8 +118,20 @@ public class Gens {
     {
         if (values == null || values.isEmpty())
             throw new IllegalArgumentException("values is empty");
+        // if 2 values have the same weight we need some way to tie-break, but 
that isn't always possible...
+        // this method relies on the map having some order and will reject any 
map that doesn't define a deterministic order
+        if (!(values instanceof EnumMap || values instanceof LinkedHashMap))
+            throw new IllegalArgumentException("pick(Map) requires a map with 
deterministic iteration; given " + values.getClass());
+        if (values.size() == 1)
+            return 
constant(Objects.requireNonNull(Iterables.getFirst(values.keySet(), null)));
         double totalWeight = 
values.values().stream().mapToDouble(Integer::intValue).sum();
-        List<Weight<T>> list = values.entrySet().stream().map(e -> new 
Weight<>(e.getKey(), e.getValue())).collect(Collectors.toList());
+        List<Weight<T>> list = new ArrayList<>(values.size());
+        Iterator<Map.Entry<T, Integer>> it = values.entrySet().iterator();
+        for (int i = 0; it.hasNext(); i++)
+        {
+            Map.Entry<T, Integer> e = it.next();
+            list.add(new Weight<>(e.getKey(), e.getValue(), i));
+        }
         Collections.sort(list);
         return rs -> {
             double value = rs.nextDouble() * totalWeight;
@@ -89,6 +145,215 @@ public class Gens {
         };
     }
 
+    public static Gen.IntGen pickZipf(int[] array)
+    {
+        if (array == null || array.length == 0)
+            throw new IllegalArgumentException("Empty array given");
+        if (array.length == 1)
+            return ignore -> array[0];
+        BigDecimal[] weights = new BigDecimal[array.length];
+        BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+        weights[0] = base;
+        for (int i = 1; i < array.length; i++)
+            weights[i] = base.divide(BigDecimal.valueOf(i + 1), 
RoundingMode.UP);
+        BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, 
BigDecimal::add);
+
+        return rs -> {
+            BigDecimal value = 
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+            for (int i = 0; i < weights.length; i++)
+            {
+                value = value.subtract(weights[i]);
+                if (value.compareTo(BigDecimal.ZERO) <= 0)
+                    return array[i];
+            }
+            return array[array.length - 1];
+        };
+    }
+
+    public static Gen.LongGen pickZipf(long[] array)
+    {
+        if (array == null || array.length == 0)
+            throw new IllegalArgumentException("Empty array given");
+        if (array.length == 1)
+            return ignore -> array[0];
+        BigDecimal[] weights = new BigDecimal[array.length];
+        BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+        weights[0] = base;
+        for (int i = 1; i < array.length; i++)
+            weights[i] = base.divide(BigDecimal.valueOf(i + 1), 
RoundingMode.UP);
+        BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, 
BigDecimal::add);
+
+        return rs -> {
+            BigDecimal value = 
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+            for (int i = 0; i < weights.length; i++)
+            {
+                value = value.subtract(weights[i]);
+                if (value.compareTo(BigDecimal.ZERO) <= 0)
+                    return array[i];
+            }
+            return array[array.length - 1];
+        };
+    }
+
+    public static <T> Gen<T> pickZipf(T... array)
+    {
+        return pickZipf(Arrays.asList(array));
+    }
+
+    public static <T> Gen<T> pickZipf(List<T> array)
+    {
+        if (array == null || array.isEmpty())
+            throw new IllegalArgumentException("Empty array given");
+        if (array.size() == 1)
+            return ignore -> array.get(0);
+        BigDecimal[] weights = new BigDecimal[array.size()];
+        BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size()));
+        weights[0] = base;
+        for (int i = 1; i < array.size(); i++)
+            weights[i] = base.divide(BigDecimal.valueOf(i + 1), 
RoundingMode.UP);
+        BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, 
BigDecimal::add);
+
+        return rs -> {
+            BigDecimal value = 
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+            for (int i = 0; i < weights.length; i++)
+            {
+                value = value.subtract(weights[i]);
+                if (value.compareTo(BigDecimal.ZERO) <= 0)
+                    return array.get(i);
+            }
+            return array.get(array.size() - 1);
+        };
+    }
+
+    public static Gen<Gen.IntGen> mixedDistribution(int minInclusive, int 
maxExclusive)
+    {
+        int domainSize = (maxExclusive - minInclusive + 1);
+        if (domainSize < 0)
+            throw new IllegalArgumentException("Range is too large; min=" + 
minInclusive + ", max=" + maxExclusive);
+        int[] array, indexes;
+        if (domainSize > 200) // randomly selected
+        {
+            int numBuckets = 10;
+            int delta = domainSize / numBuckets;
+            array = new int[numBuckets];
+            for (int i = 0; i < numBuckets; i++)
+                array[i] = minInclusive + i * delta;
+            indexes = IntStream.range(0, array.length).toArray();
+        }
+        else
+        {
+            array = IntStream.range(minInclusive, maxExclusive).toArray();
+            indexes = null;
+        }
+        return rs -> {
+            switch (rs.nextInt(0, 2))
+            {
+                case 0: // uniform
+                    return r -> r.nextInt(minInclusive, maxExclusive);
+                case 1: // zipf
+                    if (indexes == null)
+                        return Gens.pickZipf(rs.nextBoolean() ? 
reverseAndCopy(array) : array);
+                    return Gens.pickZipf(rs.nextBoolean() ? 
reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> {
+                        int start = array[index];
+                        int end = index == array.length - 1 ? maxExclusive : 
array[index + 1];
+                        return r.nextInt(start, end);
+                    });
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
+    private static int[] reverseAndCopy(int[] array)
+    {
+        array = Arrays.copyOf(array, array.length);
+        for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; 
i++, j--)
+        {
+            int tmp = array[i];
+            array[i] = array[j];
+            array[j] = tmp;
+        }
+        return array;
+    }
+
+    public static Gen<Gen.LongGen> mixedDistribution(long minInclusive, long 
maxExclusive)
+    {
+        long domainSize = (maxExclusive - minInclusive + 1);
+        if (domainSize < 0)
+            throw new IllegalArgumentException("Range is too large; min=" + 
minInclusive + ", max=" + maxExclusive);
+        long[] array;
+        int[] indexes;
+        if (domainSize > 200) // randomly selected
+        {
+            int numBuckets = 10;
+            long delta = domainSize / numBuckets;
+            array = new long[numBuckets];
+            for (int i = 0; i < numBuckets; i++)
+                array[i] = minInclusive + i * delta;
+            indexes = IntStream.range(0, array.length).toArray();
+        }
+        else
+        {
+            array = LongStream.range(minInclusive, maxExclusive).toArray();
+            indexes = null;
+        }
+        return rs -> {
+            switch (rs.nextInt(0, 2))
+            {
+                case 0: // uniform
+                    return r -> r.nextLong(minInclusive, maxExclusive);
+                case 1: // zipf
+                    if (indexes == null)
+                        return Gens.pickZipf(rs.nextBoolean() ? 
reverseAndCopy(array) : array);
+                    return Gens.pickZipf(rs.nextBoolean() ? 
reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> {
+                        long start = array[index];
+                        long end = index == array.length - 1 ? maxExclusive : 
array[index + 1];
+                        return r.nextLong(start, end);
+                    });
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
+    private static long[] reverseAndCopy(long[] array)
+    {
+        array = Arrays.copyOf(array, array.length);
+        for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; 
i++, j--)
+        {
+            long tmp = array[i];
+            array[i] = array[j];
+            array[j] = tmp;
+        }
+        return array;
+    }
+
+    public static <T> Gen<Gen<T>> mixedDistribution(T... list)
+    {
+        return mixedDistribution(Arrays.asList(list));
+    }
+
+    public static <T> Gen<Gen<T>> mixedDistribution(List<T> list)
+    {
+        return rs -> {
+            switch (rs.nextInt(0, 2))
+            {
+                case 0: // uniform
+                    return r -> list.get(rs.nextInt(0, list.size()));
+                case 1: // zipf
+                    List<T> array = list;
+                    if (rs.nextBoolean())
+                    {
+                        array = new ArrayList<>(list);
+                        Collections.reverse(array);
+                    }
+                    return pickZipf(array);
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
     public static Gen<char[]> charArray(Gen.IntGen sizes, char[] domain)
     {
         return charArray(sizes, domain, (a, b) -> true);
@@ -163,6 +428,11 @@ public class Gens {
         return new StringDSL();
     }
 
+    public static BooleanSupplier supplier(Gen<Boolean> gen, RandomSource rs)
+    {
+        return () -> gen.next(rs);
+    }
+
     public static class BooleanDSL
     {
         public Gen<Boolean> all()
@@ -170,12 +440,12 @@ public class Gens {
             return RandomSource::nextBoolean;
         }
 
-        public Gen<Boolean> runs(double ratio, int maxRuns)
+        public Gen<Boolean> biasedRepeatingRuns(double ratio, int maxRuns)
         {
             Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to 
be larger than 0 and <= 1", ratio);
             double lower = ratio * .8;
             double upper = ratio * 1.2;
-            return new Gen<>() {
+            return new Gen<Boolean>() {
                 // run represents how many consecutaive true values should be 
returned; -1 implies no active "run" exists
                 private int run = -1;
                 private long falseCount = 0, trueCount = 0;
@@ -213,6 +483,28 @@ public class Gens {
                 }
             };
         }
+
+        public Gen<Gen<Boolean>> mixedDistribution()
+        {
+            return rs -> {
+                int selection = rs.nextInt(0, 4);
+                switch (selection)
+                {
+                    case 0: // uniform 50/50
+                        return r -> r.nextBoolean();
+                    case 1: // variable frequency
+                        var freq = rs.nextFloat();
+                        return r -> r.decide(freq);
+                    case 2: // fixed result
+                        boolean result = rs.nextBoolean();
+                        return ignore -> result;
+                    case 3: // biased repeating runs
+                        return biasedRepeatingRuns(rs.nextDouble(), 
rs.nextInt(1, 100));
+                    default:
+                        throw new IllegalStateException("Unexpected int for 
bool selection: " + selection);
+                }
+            };
+        }
     }
 
     public static class IntDSL
@@ -238,6 +530,11 @@ public class Gens {
                 return r -> r.nextInt(min, max);
             return r -> r.nextInt(min, max + 1);
         }
+
+        public Gen<Gen.IntGen> mixedDistribution(int minInclusive, int 
maxExclusive)
+        {
+            return Gens.mixedDistribution(minInclusive, maxExclusive);
+        }
     }
 
     public static class LongDSL {
@@ -269,6 +566,11 @@ public class Gens {
             return pick(klass.getEnumConstants());
         }
 
+        public <T extends Enum<T>> Gen<Gen<T>> allMixedDistribution(Class<T> 
klass)
+        {
+            return mixedDistribution(klass.getEnumConstants());
+        }
+
         public <T extends Enum<T>> Gen<T> allWithWeights(Class<T> klass, 
int... weights)
         {
             T[] constants = klass.getEnumConstants();
@@ -433,16 +735,7 @@ public class Gens {
                 int size = sizeGen.nextInt(r);
                 T[] list = (T[]) Array.newInstance(type, size);
                 for (int i = 0; i < size; i++)
-                {
-                    try
-                    {
-                        list[i] = fn.next(r);
-                    }
-                    catch (IgnoreGenResult e)
-                    {
-                        return Arrays.copyOf(list, i);
-                    }
-                }
+                    list[i] = fn.next(r);
                 return list;
             };
         }
@@ -513,7 +806,7 @@ public class Gens {
         }
     }
 
-    private interface Reset {
+    protected interface Reset {
         static void tryReset(Object o)
         {
             if (o instanceof Reset)
@@ -612,15 +905,20 @@ public class Gens {
     {
         private final T value;
         private final double weight;
+        private final int index;
 
-        private Weight(T value, double weight) {
+        private Weight(T value, double weight, int index) {
             this.value = value;
             this.weight = weight;
+            this.index = index;
         }
 
         @Override
         public int compareTo(Weight<T> o) {
-            return Double.compare(weight, o.weight);
+            int rc = Double.compare(weight, o.weight);
+            if (rc == 0)
+                rc = Integer.compare(index, o.index);
+            return rc;
         }
     }
 }
diff --git a/test/unit/accord/utils/Property.java 
b/test/unit/accord/utils/Property.java
index b90472c862..e45642c1f1 100644
--- a/test/unit/accord/utils/Property.java
+++ b/test/unit/accord/utils/Property.java
@@ -18,23 +18,32 @@
 
 package accord.utils;
 
+import accord.utils.async.TimeoutUtils;
+
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-
 public class Property
 {
     public static abstract class Common<T extends Common<T>>
     {
-        protected long seed = ThreadLocalRandom.current().nextLong();
+        protected long seed = SeedProvider.instance.nextSeed();
         protected int examples = 1000;
 
         protected boolean pure = true;
@@ -74,42 +83,26 @@ public class Property
         public T withTimeout(Duration timeout)
         {
             this.timeout = timeout;
+            this.pure = false;
             return (T) this;
         }
 
         protected void checkWithTimeout(Runnable fn)
         {
-            AsyncPromise<?> promise = new AsyncPromise<>();
-            Thread t = new Thread(() -> {
-                try
-                {
-                    fn.run();
-                    promise.setSuccess(null);
-                }
-                catch (Throwable e)
-                {
-                    promise.setFailure(e);
-                }
-            });
-            t.setName("property with timeout");
-            t.setDaemon(true);
             try
             {
-                t.start();
-                promise.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
+                TimeoutUtils.runBlocking(timeout, "property with timeout", 
fn::run);
             }
             catch (ExecutionException e)
             {
-                throw new ProtocolException(propertyError(this, e.getCause()));
+                throw new PropertyError(propertyError(this, e.getCause()));
             }
             catch (InterruptedException e)
             {
-                t.interrupt();
                 throw new PropertyError(propertyError(this, e));
             }
             catch (TimeoutException e)
             {
-                t.interrupt();
                 TimeoutException override = new TimeoutException("property 
test did not complete within " + this.timeout);
                 override.setStackTrace(new StackTraceElement[0]);
                 throw new PropertyError(propertyError(this, override));
@@ -167,7 +160,10 @@ public class Property
         }
         try
         {
-            return value.toString();
+            String result = value.toString();
+            if (result != null && result.length() > 100 && value instanceof 
Collection)
+                result = ((Collection<?>) value).stream().map(o -> "\n\t     " 
+ o).collect(Collectors.joining(",", "[", "]"));
+            return result;
         }
         catch (Throwable t)
         {
@@ -175,7 +171,7 @@ public class Property
         }
     }
 
-    private static String propertyError(Common<?> input, Throwable cause, 
Object... values)
+    private static StringBuilder propertyErrorCommon(Common<?> input, 
Throwable cause)
     {
         StringBuilder sb = new StringBuilder();
         // return "Seed=" + seed + "\nExamples=" + examples;
@@ -193,15 +189,35 @@ public class Property
                 msg = cause.getClass().getCanonicalName();
             sb.append(msg).append('\n');
         }
+        return sb;
+    }
+
+    private static String propertyError(Common<?> input, Throwable cause, 
Object... values)
+    {
+        StringBuilder sb = propertyErrorCommon(input, cause);
         if (values != null)
         {
             sb.append("Values:\n");
             for (int i = 0; i < values.length; i++)
-                sb.append('\t').append(i).append(" = 
").append(normalizeValue(values[i])).append('\n');
+                sb.append('\t').append(i).append(" = 
").append(normalizeValue(values[i])).append(": ").append(values[i] == null ? 
"unknown type" : values[i].getClass().getCanonicalName()).append('\n');
         }
         return sb.toString();
     }
 
+    private static String statefulPropertyError(StatefulBuilder input, 
Throwable cause, Object state, List<String> history)
+    {
+        StringBuilder sb = propertyErrorCommon(input, cause);
+        sb.append("Steps: ").append(input.steps).append('\n');
+        sb.append("Values:\n");
+        String stateStr = state == null ? null : 
state.toString().replace("\n", "\n\t\t");
+        sb.append("\tState: ").append(stateStr).append(": ").append(state == 
null ? "unknown type" : state.getClass().getCanonicalName()).append('\n');
+        sb.append("\tHistory:").append('\n');
+        int idx = 0;
+        for (var event : history)
+            sb.append("\t\t").append(++idx).append(": 
").append(event).append('\n');
+        return sb.toString();
+    }
+
     public interface FailingConsumer<A>
     {
         void accept(A value) throws Exception;
@@ -379,4 +395,523 @@ public class Property
     {
         return new ForBuilder();
     }
+
+    public static StatefulBuilder stateful()
+    {
+        return new StatefulBuilder();
+    }
+
+    public static class StatefulBuilder extends Common<StatefulBuilder>
+    {
+        protected int steps = 1000;
+        @Nullable
+        protected Duration stepTimeout = null;
+
+        public StatefulBuilder()
+        {
+            examples = 500;
+        }
+
+        public StatefulBuilder withSteps(int steps)
+        {
+            this.steps = steps;
+            return this;
+        }
+
+        public StatefulBuilder withStepTimeout(Duration duration)
+        {
+            stepTimeout = duration;
+            return this;
+        }
+
+        @SuppressWarnings("rawtypes")
+        public <State, SystemUnderTest> void check(Commands<State, 
SystemUnderTest> commands)
+        {
+            RandomSource rs = new DefaultRandom(seed);
+            for (int i = 0; i < examples; i++)
+            {
+                State state = null;
+                List<String> history = new ArrayList<>(steps);
+                try
+                {
+                    checkInterrupted();
+
+                    state = commands.genInitialState().next(rs);
+                    SystemUnderTest sut = commands.createSut(state);
+
+                    try
+                    {
+                        for (int j = 0; j < steps; j++)
+                        {
+                            Gen<Command<State, SystemUnderTest, ?>> cmdGen = 
commands.commands(state);
+                            Command cmd = cmdGen.next(rs);
+                            for (int a = 0; cmd.checkPreconditions(state) != 
PreCheckResult.Ok && a < 42; a++)
+                            {
+                                if (a == 41)
+                                    throw new IllegalArgumentException("Unable 
to find next command");
+                                cmd = cmdGen.next(rs);
+                            }
+                            if (cmd instanceof MultistepCommand)
+                            {
+                                for (Command<State, SystemUnderTest, ?> sub : 
((MultistepCommand<State, SystemUnderTest>) cmd))
+                                {
+                                    history.add(sub.detailed(state));
+                                    process(sub, state, sut, history.size());
+                                }
+                            }
+                            else
+                            {
+                                history.add(cmd.detailed(state));
+                                process(cmd, state, sut, history.size());
+                            }
+                        }
+                        commands.destroySut(sut, null);
+                        commands.destroyState(state, null);
+                    }
+                    catch (Throwable t)
+                    {
+                        try
+                        {
+                            commands.destroySut(sut, t);
+                            commands.destroyState(state, t);
+                        }
+                        catch (Throwable t2)
+                        {
+                            t.addSuppressed(t2);
+                        }
+                        throw t;
+                    }
+                }
+                catch (Throwable t)
+                {
+                    throw new PropertyError(statefulPropertyError(this, t, 
state, history), t);
+                }
+                if (pure)
+                {
+                    seed = rs.nextLong();
+                    rs.setSeed(seed);
+                }
+            }
+        }
+
+        private <State, SystemUnderTest> void process(Command cmd, State 
state, SystemUnderTest sut, int id) throws Throwable
+        {
+            if (stepTimeout == null)
+            {
+                cmd.process(state, sut);
+                return;
+            }
+            TimeoutUtils.runBlocking(stepTimeout, "Stateful Step " + id, () -> 
cmd.process(state, sut));
+        }
+    }
+
+    public enum PreCheckResult { Ok, Ignore }
+    public interface Command<State, SystemUnderTest, Result>
+    {
+        default PreCheckResult checkPreconditions(State state) {return 
PreCheckResult.Ok;}
+        Result apply(State state) throws Throwable;
+        Result run(SystemUnderTest sut) throws Throwable;
+        default void checkPostconditions(State state, Result expected,
+                                         SystemUnderTest sut, Result actual) 
throws Throwable {}
+        default String detailed(State state) {return this.toString();}
+        default void process(State state, SystemUnderTest sut) throws Throwable
+        {
+            checkPostconditions(state, apply(state),
+                                sut, run(sut));
+        }
+    }
+
+    public static <State, SystemUnderTest> MultistepCommand<State, 
SystemUnderTest> multistep(Command<State, SystemUnderTest, ?>... cmds)
+    {
+        return multistep(Arrays.asList(cmds));
+    }
+
+    public static <State, SystemUnderTest> MultistepCommand<State, 
SystemUnderTest> multistep(List<Command<State, SystemUnderTest, ?>> cmds)
+    {
+        List<Command<State, SystemUnderTest, ?>> result = new 
ArrayList<>(cmds.size());
+        for (Command<State, SystemUnderTest, ?> c : cmds)
+        {
+            if (c instanceof MultistepCommand) 
result.addAll(flatten((MultistepCommand<State, SystemUnderTest>) c));
+            else                               result.add(c);
+        }
+        return result::iterator;
+    }
+
+    private static <State, SystemUnderTest> Collection<? extends 
Command<State, SystemUnderTest, ?>> flatten(MultistepCommand<State, 
SystemUnderTest> mc)
+    {
+        List<Command<State, SystemUnderTest, ?>> result = new ArrayList<>();
+        for (Command<State, SystemUnderTest, ?> c : mc)
+        {
+            if (c instanceof MultistepCommand) 
result.addAll(flatten((MultistepCommand<State, SystemUnderTest>) c));
+            else                               result.add(c);
+        }
+        return result;
+    }
+
+    public interface MultistepCommand<State, SystemUnderTest> extends 
Command<State, SystemUnderTest, Object>, Iterable<Command<State, 
SystemUnderTest, ?>>
+    {
+        @Override
+        default PreCheckResult checkPreconditions(State state)
+        {
+            for (Command<State, SystemUnderTest, ?> cmd : this)
+            {
+                PreCheckResult result = cmd.checkPreconditions(state);
+                if (result != PreCheckResult.Ok) return result;
+            }
+            return PreCheckResult.Ok;
+        }
+
+        @Override
+        default Object apply(State state) throws Throwable
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        default Object run(SystemUnderTest sut) throws Throwable
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        default void checkPostconditions(State state, Object expected, 
SystemUnderTest sut, Object actual) throws Throwable
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        default String detailed(State state)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        default void process(State state, SystemUnderTest sut) throws Throwable
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public static <State, SystemUnderTest, Result> Command<State, 
SystemUnderTest, Result> ignoreCommand()
+    {
+        return new Command<>()
+        {
+            @Override
+            public PreCheckResult checkPreconditions(State state)
+            {
+                return PreCheckResult.Ignore;
+            }
+
+            @Override
+            public Result apply(State state) throws Throwable
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public Result run(SystemUnderTest sut) throws Throwable
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public String detailed(State state)
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+
+    public interface UnitCommand<State, SystemUnderTest> extends 
Command<State, SystemUnderTest, Void>
+    {
+        void applyUnit(State state) throws Throwable;
+        void runUnit(SystemUnderTest sut) throws Throwable;
+
+        @Override
+        default Void apply(State state) throws Throwable
+        {
+            applyUnit(state);
+            return null;
+        }
+
+        @Override
+        default Void run(SystemUnderTest sut) throws Throwable
+        {
+            runUnit(sut);
+            return null;
+        }
+    }
+
+    public interface StateOnlyCommand<State> extends UnitCommand<State, Void>
+    {
+        @Override
+        default void runUnit(Void sut) throws Throwable {}
+    }
+
+    public static class SimpleCommand<State> implements StateOnlyCommand<State>
+    {
+        private final Function<State, String> name;
+        private final Consumer<State> fn;
+
+        public SimpleCommand(String name, Consumer<State> fn)
+        {
+            this.name = ignore -> name;
+            this.fn = fn;
+        }
+
+        public SimpleCommand(Function<State, String> name, Consumer<State> fn)
+        {
+            this.name = name;
+            this.fn = fn;
+        }
+
+        @Override
+        public String detailed(State state)
+        {
+            return name.apply(state);
+        }
+
+        @Override
+        public void applyUnit(State state)
+        {
+            fn.accept(state);
+        }
+    }
+
+    public interface Commands<State, SystemUnderTest>
+    {
+        Gen<State> genInitialState() throws Throwable;
+        SystemUnderTest createSut(State state) throws Throwable;
+        default void destroyState(State state, @Nullable Throwable cause) 
throws Throwable {}
+        default void destroySut(SystemUnderTest sut, @Nullable Throwable 
cause) throws Throwable {}
+        Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws 
Throwable;
+    }
+
+    public static <State, SystemUnderTest> CommandsBuilder<State, 
SystemUnderTest> commands(Supplier<Gen<State>> stateGen, Function<State, 
SystemUnderTest> sutFactory)
+    {
+        return new CommandsBuilder<>(stateGen, sutFactory);
+    }
+
+    public static <State> CommandsBuilder<State, Void> 
commands(Supplier<Gen<State>> stateGen)
+    {
+        return new CommandsBuilder<>(stateGen, ignore -> null);
+    }
+
+    public static class CommandsBuilder<State, SystemUnderTest>
+    {
+        public interface Setup<State, SystemUnderTest>
+        {
+            Command<State, SystemUnderTest, ?> setup(RandomSource rs, State 
state);
+        }
+        private final Supplier<Gen<State>> stateGen;
+        private final Function<State, SystemUnderTest> sutFactory;
+        private final Map<Setup<State, SystemUnderTest>, Integer> knownWeights 
= new LinkedHashMap<>();
+        @Nullable
+        private Set<Setup<State, SystemUnderTest>> unknownWeights = null;
+        private Gen.IntGen unknownWeightGen = Gens.ints().between(1, 10);
+        @Nullable
+        private FailingConsumer<State> preCommands = null;
+        @Nullable
+        private FailingBiConsumer<State, Throwable> destroyState = null;
+        @Nullable
+        private FailingBiConsumer<SystemUnderTest, Throwable> destroySut = 
null;
+
+        public CommandsBuilder(Supplier<Gen<State>> stateGen, Function<State, 
SystemUnderTest> sutFactory)
+        {
+            this.stateGen = stateGen;
+            this.sutFactory = sutFactory;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
preCommands(FailingConsumer<State> preCommands)
+        {
+            this.preCommands = preCommands;
+            return this;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
destroyState(FailingConsumer<State> destroyState)
+        {
+            return destroyState((success, failure) -> {
+                if (failure == null)
+                    destroyState.accept(success);
+            });
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
destroyState(FailingBiConsumer<State, Throwable> destroyState)
+        {
+            this.destroyState = destroyState;
+            return this;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
destroySut(FailingConsumer<SystemUnderTest> destroySut)
+        {
+            return destroySut((success, failure) -> {
+                if (failure == null)
+                    destroySut.accept(success);
+            });
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
destroySut(FailingBiConsumer<SystemUnderTest, Throwable> destroySut)
+        {
+            this.destroySut = destroySut;
+            return this;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(int weight, 
Command<State, SystemUnderTest, ?> cmd)
+        {
+            return add(weight, (i1, i2) -> cmd);
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(int weight, 
Gen<Command<State, SystemUnderTest, ?>> cmd)
+        {
+            return add(weight, (rs, state) -> cmd.next(rs));
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(int weight, 
Setup<State, SystemUnderTest> cmd)
+        {
+            knownWeights.put(cmd, weight);
+            return this;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(Command<State, 
SystemUnderTest, ?> cmd)
+        {
+            return add((i1, i2) -> cmd);
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(Gen<Command<State, 
SystemUnderTest, ?>> cmd)
+        {
+            return add((rs, state) -> cmd.next(rs));
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd)
+        {
+            if (unknownWeights == null)
+                unknownWeights = new LinkedHashSet<>();
+            unknownWeights.add(cmd);
+            return this;
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, Gen<Command<State, SystemUnderTest, ?>> cmd)
+        {
+            return add((rs, state) -> {
+                if (!predicate.test(state)) return ignoreCommand();
+                return cmd.next(rs);
+            });
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State> 
predicate, Setup<State, SystemUnderTest> cmd)
+        {
+            return add((rs, state) -> {
+                if (!predicate.test(state)) return ignoreCommand();
+                return cmd.setup(rs, state);
+            });
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
addAllIf(Predicate<State> predicate, Consumer<IfBuilder<State, 
SystemUnderTest>> sub)
+        {
+            sub.accept(new IfBuilder<>()
+            {
+                @Override
+                public IfBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd)
+                {
+                    CommandsBuilder.this.addIf(predicate, cmd);
+                    return this;
+                }
+            });
+            return this;
+        }
+
+        public interface IfBuilder<State, SystemUnderTest>
+        {
+            IfBuilder<State, SystemUnderTest> add(Setup<State, 
SystemUnderTest> cmd);
+        }
+
+        public CommandsBuilder<State, SystemUnderTest> 
unknownWeight(Gen.IntGen unknownWeightGen)
+        {
+            this.unknownWeightGen = Objects.requireNonNull(unknownWeightGen);
+            return this;
+        }
+
+        public Commands<State, SystemUnderTest> build()
+        {
+            Gen<Setup<State, SystemUnderTest>> commandsGen;
+            if (unknownWeights == null)
+            {
+                commandsGen = Gens.pick(new LinkedHashMap<>(knownWeights));
+            }
+            else
+            {
+                class DynamicWeightsGen implements Gen<Setup<State, 
SystemUnderTest>>, Gens.Reset
+                {
+                    Gen<Setup<State, SystemUnderTest>> gen;
+                    @Override
+                    public Setup<State, SystemUnderTest> next(RandomSource rs)
+                    {
+                        if (gen == null)
+                        {
+                            // create random weights
+                            LinkedHashMap<Setup<State, SystemUnderTest>, 
Integer> clone = new LinkedHashMap<>(knownWeights);
+                            for (Setup<State, SystemUnderTest> s : 
unknownWeights)
+                                clone.put(s, unknownWeightGen.nextInt(rs));
+                            gen = Gens.pick(clone);
+                        }
+                        return gen.next(rs);
+                    }
+
+                    @Override
+                    public void reset()
+                    {
+                        gen = null;
+                    }
+                }
+                commandsGen = new DynamicWeightsGen();
+            }
+            return new Commands<>()
+            {
+                @Override
+                public Gen<State> genInitialState() throws Throwable
+                {
+                    return stateGen.get();
+                }
+
+                @Override
+                public SystemUnderTest createSut(State state) throws Throwable
+                {
+                    return sutFactory.apply(state);
+                }
+
+                @Override
+                public Gen<Command<State, SystemUnderTest, ?>> commands(State 
state) throws Throwable
+                {
+                    if (preCommands != null)
+                        preCommands.accept(state);
+                    return commandsGen.map((rs, setup) -> setup.setup(rs, 
state));
+                }
+
+                @Override
+                public void destroyState(State state, @Nullable Throwable 
cause) throws Throwable
+                {
+                    Gens.Reset.tryReset(commandsGen);
+                    if (destroyState != null)
+                        destroyState.accept(state, cause);
+                }
+
+                @Override
+                public void destroySut(SystemUnderTest sut, @Nullable 
Throwable cause) throws Throwable
+                {
+                    if (destroySut != null)
+                        destroySut.accept(sut, cause);
+                }
+            };
+        }
+
+        public interface FailingConsumer<T>
+        {
+            void accept(T value) throws Throwable;
+        }
+
+        public interface FailingBiConsumer<A, B>
+        {
+            void accept(A a, B b) throws Throwable;
+        }
+    }
 }
diff --git a/test/unit/accord/utils/SeedProvider.java 
b/test/unit/accord/utils/SeedProvider.java
new file mode 100644
index 0000000000..9c7858dafe
--- /dev/null
+++ b/test/unit/accord/utils/SeedProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utility class for creating seeds.  This class mostly matches the semantics 
of {@link java.util.Random} but makes the logic work
+ * for any random source.  This class should be used in replacement of most 
seed methods, and should always replace {@link 
java.util.concurrent.ThreadLocalRandom}
+ * as that randomness will have a bias twords the same seed after a restart 
(if you rerun randomized tests by restarting
+ * the JVM you will run with the same seed over and over again).
+ */
+public class SeedProvider
+{
+    public static final SeedProvider instance = new SeedProvider();
+    private final AtomicLong seedUniquifier = new 
AtomicLong(8682522807148012L);
+
+    private long seedUniquifier()
+    {
+        // L'Ecuyer, "Tables of Linear Congruential Generators of
+        // Different Sizes and Good Lattice Structure", 1999
+        for (; ; )
+        {
+            long current = seedUniquifier.get();
+            long next = current * 1181783497276652981L;
+            if (seedUniquifier.compareAndSet(current, next))
+                return next;
+        }
+    }
+
+    public long nextSeed()
+    {
+        return seedUniquifier() ^ System.nanoTime();
+    }
+}
diff --git a/test/unit/accord/utils/async/TimeoutUtils.java 
b/test/unit/accord/utils/async/TimeoutUtils.java
new file mode 100644
index 0000000000..f12c0b17e9
--- /dev/null
+++ b/test/unit/accord/utils/async/TimeoutUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+
+public class TimeoutUtils
+{
+    public interface FailingRunnable
+    {
+        void run() throws Throwable;
+    }
+
+    public static void runBlocking(Duration timeout, String threadName, 
FailingRunnable fn) throws ExecutionException, InterruptedException, 
TimeoutException
+    {
+        // MAINTENANCE: Once the accord branch merges to trunk this can be 
dropped and will be AsyncChain again, but since this is forked into C* (that 
doesn't have AsyncChain) need to use Futures
+//        AsyncResult.Settable<?> promise = AsyncResults.settable();
+        AsyncPromise<?> promise = new AsyncPromise<>();
+        Thread t = new Thread(() -> {
+            try
+            {
+                fn.run();
+                promise.setSuccess(null);
+            }
+            catch (Throwable e)
+            {
+                promise.setFailure(e);
+            }
+        });
+        t.setName(threadName);
+        t.setDaemon(true);
+        t.start();
+        try
+        {
+//            AsyncChains.getBlocking(promise, timeout.toNanos(), 
TimeUnit.NANOSECONDS);
+            promise.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            t.interrupt();
+            throw e;
+        }
+        catch (TimeoutException e)
+        {
+            t.interrupt();
+            throw e;
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java 
b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
index e206af390b..01540499f4 100644
--- a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
+++ b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
@@ -413,7 +413,7 @@ public class SimulatedExecutorFactory implements 
ExecutorFactory, Clock
             long max = TimeUnit.MILLISECONDS.toNanos(5);
             LongSupplier small = () -> rs.nextLong(0, maxSmall);
             LongSupplier large = () -> rs.nextLong(maxSmall, max);
-            this.jitterNanos = Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, 
rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong()).asLongSupplier(rs);
+            this.jitterNanos = Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 
11) / 100.0D, rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong()).asLongSupplier(rs);
         }
 
         @Override
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 236480cea5..6d67c05ced 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -3070,7 +3070,7 @@ public abstract class CQLTester
             Map<String, Object> config = CONFIG_GEN.next(RANDOM);
             CONFIG = YamlConfigurationLoader.toYaml(config);
 
-            Config c = 
ConfigGenBuilder.santize(DatabaseDescriptor.loadConfig());
+            Config c = 
ConfigGenBuilder.sanitize(DatabaseDescriptor.loadConfig());
             YamlConfigurationLoader.updateFromMap(config, true, c);
 
             DatabaseDescriptor.unsafeDaemonInitialization(() -> c);
diff --git a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java 
b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
index f36d04d8a5..f0f34888a2 100644
--- a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
+++ b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
@@ -98,7 +98,7 @@ public class SimulatedMessageDelivery implements 
MessageDelivery
                 long max = TimeUnit.SECONDS.toNanos(5);
                 LongSupplier small = () -> rs.nextLong(min, maxSmall);
                 LongSupplier large = () -> rs.nextLong(maxSmall, max);
-                return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, 
rs.nextInt(3, 15))
+                return Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 
100.0D, rs.nextInt(3, 15))
                            .mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong())
                            .asLongSupplier(rs.fork());
             }).getAsLong();
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java 
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index 20a0e3d57f..b4aa22930f 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -970,13 +970,13 @@ public abstract class FuzzTestBase extends 
CQLTester.InMemory
                     long max = TimeUnit.SECONDS.toNanos(5);
                     LongSupplier small = () -> rs.nextLong(min, maxSmall);
                     LongSupplier large = () -> rs.nextLong(maxSmall, max);
-                    return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, 
rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong()).asLongSupplier(rs);
+                    return Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) 
/ 100.0D, rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() : 
small.getAsLong()).asLongSupplier(rs);
                 }).getAsLong();
             }
 
             private boolean networkDrops(InetAddressAndPort to)
             {
-                return networkDrops.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> 
Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 
15)).asSupplier(rs)).get();
+                return networkDrops.computeIfAbsent(new 
Connection(broadcastAddressAndPort, to), ignore -> 
Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3, 
15)).asSupplier(rs)).get();
             }
 
             @Override
diff --git a/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java 
b/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
index 6f814f809a..e0b89e767c 100644
--- a/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
+++ b/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.ImmutableMap;
 
 import accord.utils.Gen;
@@ -37,6 +39,7 @@ public class ConfigGenBuilder
     public enum Memtable
     {SkipListMemtable, TrieMemtable, ShardedSkipListMemtable}
 
+    @Nullable
     Gen<IPartitioner> partitionerGen = 
Generators.toGen(CassandraGenerators.nonLocalPartitioners());
     Gen<Config.DiskAccessMode> commitLogDiskAccessModeGen = 
Gens.enums().all(Config.DiskAccessMode.class)
                                                                 .filter(m -> m 
!= Config.DiskAccessMode.standard
@@ -70,7 +73,7 @@ public class ConfigGenBuilder
     /**
      * When loading the {@link Config} from a yaml its possible that some 
configs set will conflict with the configs that get generated here, to avoid 
that set them to a good default
      */
-    public static Config santize(Config config)
+    public static Config sanitize(Config config)
     {
         Config defaults = new Config();
         config.commitlog_sync = defaults.commitlog_sync;
@@ -85,6 +88,12 @@ public class ConfigGenBuilder
         return this;
     }
 
+    public ConfigGenBuilder withPartitionerGen(@Nullable Gen<IPartitioner> gen)
+    {
+        this.partitionerGen = gen;
+        return this;
+    }
+
     public ConfigGenBuilder withCommitLogSync(Config.CommitLogSync 
commitLogSync)
     {
         this.commitLogSyncGen = ignore -> commitLogSync;
@@ -114,6 +123,7 @@ public class ConfigGenBuilder
 
     private void updateConfigPartitioner(RandomSource rs, Map<String, Object> 
config)
     {
+        if (partitionerGen == null) return;;
         IPartitioner partitioner = partitionerGen.next(rs);
         config.put("partitioner", partitioner.getClass().getSimpleName());
     }
diff --git a/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java 
b/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
index 99df56c46b..0c2aaf29ad 100644
--- a/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
+++ b/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
@@ -62,7 +62,7 @@ public class ConfigGenBuilderTest
 
     private static Config defaultConfig()
     {
-        return ConfigGenBuilder.santize(DatabaseDescriptor.loadConfig());
+        return ConfigGenBuilder.sanitize(DatabaseDescriptor.loadConfig());
     }
 
     private static Config simpleConfig()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to