This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new fdabc1f977 Create a fuzz test that randomizes topology changes,
cluster actions, and CQL operations
fdabc1f977 is described below
commit fdabc1f9774b1c06d68a0acbf8a470e45c787eec
Author: David Capwell <[email protected]>
AuthorDate: Wed Aug 21 10:03:07 2024 -0700
Create a fuzz test that randomizes topology changes, cluster actions, and
CQL operations
patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19847
---
.../distributed/impl/AbstractCluster.java | 61 +-
.../distributed/impl/INodeProvisionStrategy.java | 161 +++--
.../cassandra/distributed/impl/InstanceConfig.java | 5 +-
.../cassandra/distributed/shared/ClusterUtils.java | 35 +-
.../fuzz/topology/HarryTopologyMixupTest.java | 159 +++++
.../fuzz/topology/TopologyMixupTestBase.java | 697 +++++++++++++++++++++
.../org/apache/cassandra/harry/HarryHelper.java | 6 +-
.../sut/injvm/InJVMTokenAwareVisitExecutor.java | 6 +-
.../cassandra/harry/sut/injvm/InJvmSutBase.java | 13 +-
.../sut/injvm/QuiescentLocalStateChecker.java | 7 +-
test/unit/accord/utils/Gen.java | 66 +-
test/unit/accord/utils/Gens.java | 332 +++++++++-
test/unit/accord/utils/Property.java | 591 ++++++++++++++++-
test/unit/accord/utils/SeedProvider.java | 51 ++
test/unit/accord/utils/async/TimeoutUtils.java | 70 +++
.../concurrent/SimulatedExecutorFactory.java | 2 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 2 +-
.../cassandra/net/SimulatedMessageDelivery.java | 2 +-
.../org/apache/cassandra/repair/FuzzTestBase.java | 4 +-
.../apache/cassandra/utils/ConfigGenBuilder.java | 12 +-
.../cassandra/utils/ConfigGenBuilderTest.java | 2 +-
21 files changed, 2102 insertions(+), 182 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 2c3e446d3c..54f90e09f7 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -29,7 +29,6 @@ import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -51,11 +50,11 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,7 +162,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
// mutated by user-facing API
private final MessageFilters filters;
- private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
+ private final INodeProvisionStrategy.Factory nodeProvisionStrategy;
private final IInstanceInitializer instanceInitializer;
private final int datadirCount;
private volatile Thread.UncaughtExceptionHandler previousHandler = null;
@@ -181,7 +180,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
public static abstract class AbstractBuilder<I extends IInstance, C
extends ICluster, B extends AbstractBuilder<I, C, B>>
extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C,
B>
{
- private INodeProvisionStrategy.Strategy nodeProvisionStrategy =
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
+ private INodeProvisionStrategy.Factory nodeProvisionStrategy =
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
private boolean dynamicPortAllocation = false;
@@ -206,7 +205,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
return (B) this;
}
- public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy
nodeProvisionStrategy)
+ public B withNodeProvisionStrategy(INodeProvisionStrategy.Factory
nodeProvisionStrategy)
{
this.nodeProvisionStrategy = nodeProvisionStrategy;
return self();
@@ -280,7 +279,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
protected IInvokableInstance delegate()
{
if (delegate == null)
- throw new IllegalStateException("Can't use shutdown instances,
delegate is null");
+ throw new IllegalStateException("Can't use shutdown node" +
config.num() + ", delegate is null");
return delegate;
}
@@ -1101,7 +1100,12 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
}
catch (Throwable t)
{
- checkForThreadLeaks();
+ IllegalStateException leak = checkForThreadLeaks();
+ if (leak != null)
+ {
+ leak.initCause(t);
+ throw leak;
+ }
throw t;
}
instances.clear();
@@ -1128,36 +1132,33 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
return true;
});
if (!drain.isEmpty())
- throw new ShutdownException(drain);
+ {
+ ShutdownException shutdownException = new ShutdownException(drain);
+ // also log as java will truncate log lists
+ logger.error("Unexpected errors", shutdownException);
+ throw shutdownException;
+ }
}
- private void checkForThreadLeaks()
+ @Nullable
+ private IllegalStateException checkForThreadLeaks()
{
//This is an alternate version of the thread leak check that just
checks to see if any threads are still alive
// with the context classloader.
- Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
- threadSet.stream().filter(t->t.getContextClassLoader() instanceof
InstanceClassLoader).forEach(t->{
- t.setContextClassLoader(null);
- throw new RuntimeException("Unterminated thread detected " +
t.getName() + " in group " + t.getThreadGroup().getName());
- });
- }
-
- // We do not want this check to run every time until we fix problems with
tread stops
- private void withThreadLeakCheck(List<Future<?>> futures)
- {
- FBUtilities.waitOnFutures(futures);
-
- Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
- threadSet = Sets.difference(threadSet,
Collections.singletonMap(Thread.currentThread(), null).keySet());
- if (!threadSet.isEmpty())
+ Map<Thread, StackTraceElement[]> allThreads =
Thread.getAllStackTraces();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Thread, StackTraceElement[]> e : allThreads.entrySet())
{
- for (Thread thread : threadSet)
- {
- System.out.println(thread);
- System.out.println(Arrays.toString(thread.getStackTrace()));
- }
- throw new RuntimeException(String.format("Not all threads have
shut down. %d threads are still running: %s", threadSet.size(), threadSet));
+
+ if (!(e.getKey().getContextClassLoader() instanceof
InstanceClassLoader)) continue;
+ e.getKey().setContextClassLoader(null);
+ sb.append(e.getKey().getName()).append(":\n");
+ for (StackTraceElement s : e.getValue())
+ sb.append("\t").append(s).append("\n");
}
+ return sb.length() > 0
+ ? new IllegalStateException("Unterminated threads detected;
active threads:\n" + sb)
+ : null;
}
public List<Token> tokens()
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index f2c088c0e5..0c10ee64e1 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@ -29,62 +29,51 @@ import static
org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
@Shared(inner = INTERFACES)
public interface INodeProvisionStrategy
{
- enum Strategy
+ @FunctionalInterface
+ interface Factory
+ {
+ INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap);
+ }
+
+ enum Strategy implements Factory
{
OneNetworkInterface
{
@Override
- INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap)
+ public INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap)
{
String ipAdress = "127.0." + subnet + ".1";
- return new INodeProvisionStrategy()
+ return new AbstractNodeProvisionStrategy(portMap)
{
- @Override
- public String seedIp()
- {
- return ipAdress;
- }
@Override
- public int seedPort()
+ protected int computeStoragePort(int nodeNum)
{
- return storagePort(1);
+ return 7011 + nodeNum;
}
@Override
- public String ipAddress(int nodeNum)
+ protected int computeNativeTransportPort(int nodeNum)
{
- return ipAdress;
+ return 9041 + nodeNum;
}
@Override
- public int storagePort(int nodeNum)
+ protected int computeJmxPort(int nodeNum)
{
- if (portMap != null)
- {
- return portMap.computeIfAbsent("storagePort@node"
+ nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7011 + nodeNum));
- }
- return 7011 + nodeNum;
+ return 7199 + nodeNum;
}
@Override
- public int nativeTransportPort(int nodeNum)
+ public int seedNodeNum()
{
- if (portMap != null)
- {
- return
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key ->
SocketUtils.findAvailablePort(seedIp(), 9041 + nodeNum));
- }
- return 9041 + nodeNum;
+ return 1;
}
@Override
- public int jmxPort(int nodeNum)
+ public String ipAddress(int nodeNum)
{
- if (portMap != null)
- {
- return portMap.computeIfAbsent("jmxPort@node" +
nodeNum, key -> SocketUtils.findAvailablePort(seedIp(), 7199 + nodeNum));
- }
- return 7199 + nodeNum;
+ return ipAdress;
}
};
}
@@ -92,22 +81,15 @@ public interface INodeProvisionStrategy
MultipleNetworkInterfaces
{
@Override
- INodeProvisionStrategy create(int subnet, @Nullable Map<String,
Integer> portMap)
+ public INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap)
{
String ipPrefix = "127.0." + subnet + '.';
- return new INodeProvisionStrategy()
+ return new AbstractNodeProvisionStrategy(portMap)
{
-
@Override
- public String seedIp()
+ public int seedNodeNum()
{
- return ipPrefix + '1';
- }
-
- @Override
- public int seedPort()
- {
- return storagePort(1);
+ return 1;
}
@Override
@@ -115,51 +97,12 @@ public interface INodeProvisionStrategy
{
return ipPrefix + nodeNum;
}
-
- @Override
- public int storagePort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("storagePort@node"
+ nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7012));
- }
- return 7012;
- }
-
- @Override
- public int nativeTransportPort(int nodeNum)
- {
- if (portMap != null)
- {
- return
portMap.computeIfAbsent("nativeTransportPort@node" + nodeNum, key ->
SocketUtils.findAvailablePort(ipAddress(nodeNum), 9042));
- }
- return 9042;
- }
-
- @Override
- public int jmxPort(int nodeNum)
- {
- if (portMap != null)
- {
- return portMap.computeIfAbsent("jmxPort@node" +
nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum), 7199));
- }
- return 7199;
- }
};
}
};
-
- INodeProvisionStrategy create(int subnet)
- {
- return create(subnet, null);
- }
-
- abstract INodeProvisionStrategy create(int subnet, @Nullable
Map<String, Integer> portMap);
}
- String seedIp();
-
- int seedPort();
+ int seedNodeNum();
String ipAddress(int nodeNum);
@@ -168,4 +111,60 @@ public interface INodeProvisionStrategy
int nativeTransportPort(int nodeNum);
int jmxPort(int nodeNum);
+
+ abstract class AbstractNodeProvisionStrategy implements
INodeProvisionStrategy
+ {
+ @Nullable
+ private final Map<String, Integer> portMap;
+
+ protected AbstractNodeProvisionStrategy(@Nullable Map<String, Integer>
portMap)
+ {
+ this.portMap = portMap;
+ }
+
+ protected int computeStoragePort(int nodeNum)
+ {
+ return 7012;
+ }
+
+ protected int computeNativeTransportPort(int nodeNum)
+ {
+ return 9042;
+ }
+
+ protected int computeJmxPort(int nodeNum)
+ {
+ return 7199;
+ }
+
+ @Override
+ public int storagePort(int nodeNum)
+ {
+ if (portMap != null)
+ {
+ return portMap.computeIfAbsent("storagePort@node" + nodeNum,
key -> SocketUtils.findAvailablePort(ipAddress(nodeNum),
computeStoragePort(nodeNum)));
+ }
+ return computeStoragePort(nodeNum);
+ }
+
+ @Override
+ public int nativeTransportPort(int nodeNum)
+ {
+ if (portMap != null)
+ {
+ return portMap.computeIfAbsent("nativeTransportPort@node" +
nodeNum, key -> SocketUtils.findAvailablePort(ipAddress(nodeNum),
computeNativeTransportPort(nodeNum)));
+ }
+ return computeNativeTransportPort(nodeNum);
+ }
+
+ @Override
+ public int jmxPort(int nodeNum)
+ {
+ if (portMap != null)
+ {
+ return portMap.computeIfAbsent("jmxPort@node" + nodeNum, key
-> SocketUtils.findAvailablePort(ipAddress(nodeNum), computeJmxPort(nodeNum)));
+ }
+ return computeJmxPort(nodeNum);
+ }
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 603b10ca7e..cc66a59732 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -305,14 +305,15 @@ public class InstanceConfig implements IInstanceConfig
Collection<String> tokens,
int datadirCount)
{
+ int seedNode = provisionStrategy.seedNodeNum();
return new InstanceConfig(nodeNum,
networkTopology,
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
provisionStrategy.ipAddress(nodeNum),
- provisionStrategy.seedIp(),
- provisionStrategy.seedPort(),
+ provisionStrategy.ipAddress(seedNode),
+ provisionStrategy.storagePort(seedNode),
String.format("%s/node%d/saved_caches",
root, nodeNum),
datadirs(datadirCount, root, nodeNum),
String.format("%s/node%d/commitlog", root,
nodeNum),
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index f6628d3047..f37d634a5d 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -303,7 +303,9 @@ public class ClusterUtils
properties.set(BOOTSTRAP_SCHEMA_DELAY_MS,
TimeUnit.SECONDS.toMillis(10));
// state which node to replace
- properties.set(REPLACE_ADDRESS_FIRST_BOOT,
toReplace.config().broadcastAddress().getAddress().getHostAddress());
+ InetSocketAddress address = toReplace.config().broadcastAddress();
+ // when port isn't defined we use the default port, but in
jvm-dtest the port might change!
+ properties.set(REPLACE_ADDRESS_FIRST_BOOT,
address.getAddress().getHostAddress() + ":" + address.getPort());
fn.accept(inst, properties);
});
@@ -572,13 +574,11 @@ public class ClusterUtils
{
public final int node;
public final Epoch epoch;
- public final Epoch replicated;
- private ClusterMetadataVersion(int node, Epoch epoch, Epoch replicated)
+ private ClusterMetadataVersion(int node, Epoch epoch)
{
this.node = node;
this.epoch = epoch;
- this.replicated = replicated;
}
public String toString()
@@ -586,11 +586,32 @@ public class ClusterUtils
return "Version{" +
"node=" + node +
", epoch=" + epoch +
- ", replicated=" + replicated +
'}';
}
}
+ public static void waitForCMSToQuiesce(ICluster<IInvokableInstance>
cluster, int[] cmsNodes)
+ {
+ // first step; find the largest epoch
+ waitForCMSToQuiesce(cluster, maxEpoch(cluster, cmsNodes));
+ }
+
+ private static Epoch maxEpoch(ICluster<IInvokableInstance> cluster, int[]
cmsNodes)
+ {
+ Epoch max = null;
+ for (int id : cmsNodes)
+ {
+ IInvokableInstance inst = cluster.get(id);
+ if (inst.isShutdown()) continue;
+ Epoch version = getClusterMetadataVersion(inst);
+ if (max == null || version.getEpoch() > max.getEpoch())
+ max = version;
+ }
+ if (max == null)
+ throw new AssertionError("Unable to find max epoch from " +
cmsNodes);
+ return max;
+ }
+
public static void waitForCMSToQuiesce(ICluster<IInvokableInstance>
cluster, Epoch awaitedEpoch, int...ignored)
{
List<ClusterMetadataVersion> notMatching = new ArrayList<>();
@@ -611,8 +632,8 @@ public class ClusterUtils
if (cluster.get(j).isShutdown())
continue;
Epoch version = getClusterMetadataVersion(cluster.get(j));
- if (!awaitedEpoch.equals(version))
- notMatching.add(new ClusterMetadataVersion(j, version,
getClusterMetadataVersion(cluster.get(j))));
+ if (version.getEpoch() < awaitedEpoch.getEpoch())
+ notMatching.add(new ClusterMetadataVersion(j, version));
}
if (notMatching.isEmpty())
return;
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
new file mode 100644
index 0000000000..11fce664a1
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import javax.annotation.Nullable;
+
+import accord.utils.Gen;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.PreCheckResult;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.RandomSource;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.harry.HarryHelper;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJvmSut;
+
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+
+public class HarryTopologyMixupTest extends
TopologyMixupTestBase<HarryTopologyMixupTest.Spec>
+{
+ @Override
+ protected Gen<State<Spec>> stateGen()
+ {
+ return HarryState::new;
+ }
+
+ @Override
+ protected void preCheck(Property.StatefulBuilder builder)
+ {
+ // if a failing seed is detected, populate here
+ // Example: builder.withSeed(42L);
+ }
+
+ @Override
+ protected void destroyState(State<Spec> state, @Nullable Throwable cause)
+ {
+ if (cause != null) return;
+ if (((HarryState) state).numInserts > 0)
+ {
+ // do one last read just to make sure we validate the data...
+ var harry = state.schemaSpec.harry;
+ harry.validateAll(harry.quiescentLocalChecker());
+ }
+ }
+
+ private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+ {
+ ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
+ new
InJvmSut(cluster),
+ new
TokenPlacementModel.SimpleReplicationFactor(3),
+
SystemUnderTest.ConsistencyLevel.ALL);
+ cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};",
HarryHelper.KEYSPACE));
+ var schema = harry.schema();
+ cluster.schemaChange(schema.compile().cql());
+ waitForCMSToQuiesce(cluster, cluster.get(1));
+ return new Spec(harry);
+ }
+
+ private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>,
Void, ?>> cqlOperations(Spec spec)
+ {
+ class HarryCommand extends SimpleCommand<State<Spec>>
+ {
+ HarryCommand(Function<State<Spec>, String> name,
Consumer<State<Spec>> fn)
+ {
+ super(name, fn);
+ }
+
+ @Override
+ public PreCheckResult checkPreconditions(State<Spec> state)
+ {
+ int clusterSize = state.topologyHistory.up().length;
+ return clusterSize >= 3 ? PreCheckResult.Ok :
PreCheckResult.Ignore;
+ }
+ }
+ Command<State<Spec>, Void, ?> insert = new HarryCommand(state ->
"Harry Insert" + state.commandNamePostfix(), state -> {
+ spec.harry.insert();
+ ((HarryState) state).numInserts++;
+ });
+ Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state ->
"Harry Validate All" + state.commandNamePostfix(), state -> {
+ spec.harry.validateAll(spec.harry.quiescentLocalChecker());
+ ((HarryState) state).numInserts = 0;
+ });
+ return (rs, state) -> {
+ HarryState harryState = (HarryState) state;
+ TopologyHistory history = state.topologyHistory;
+ // if any topology change happened, then always validate all
+ if (harryState.generation != history.generation())
+ {
+ harryState.generation = history.generation();
+ return validateAll;
+ }
+ if ((harryState.numInserts > 0 && rs.decide(0.2))) // 20% of the
time do reads
+ return validateAll;
+ return insert;
+ };
+ }
+
+ public static class Spec implements TopologyMixupTestBase.SchemaSpec
+ {
+ private final ReplayingHistoryBuilder harry;
+
+ public Spec(ReplayingHistoryBuilder harry)
+ {
+ this.harry = harry;
+ }
+
+ @Override
+ public String name()
+ {
+ return harry.schema().table;
+ }
+
+ @Override
+ public String keyspaceName()
+ {
+ return HarryHelper.KEYSPACE;
+ }
+ }
+
+ public static class HarryState extends State<Spec>
+ {
+ private long generation;
+ private int numInserts = 0;
+ public HarryState(RandomSource rs)
+ {
+ super(rs, HarryTopologyMixupTest::createSchemaSpec,
HarryTopologyMixupTest::cqlOperations);
+ }
+
+ @Override
+ protected void onConfigure(IInstanceConfig config)
+ {
+ config.set("metadata_snapshot_frequency", 5);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
new file mode 100644
index 0000000000..8e8c57b132
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.topology;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.Sets;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import accord.utils.Property;
+import accord.utils.Property.Command;
+import accord.utils.Property.SimpleCommand;
+import accord.utils.RandomSource;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.YamlConfigurationLoader;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.utils.ConfigGenBuilder;
+
+import static accord.utils.Property.commands;
+import static accord.utils.Property.ignoreCommand;
+import static accord.utils.Property.multistep;
+import static accord.utils.Property.stateful;
+
+/**
+ * These tests can create many instances, so mac users may need to run the
following to avoid address bind failures
+ * <p>
+ * {@code for id in $(seq 0 15); do sudo ifconfig lo0 alias "127.0.0.$id";
done;}
+ */
+public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.SchemaSpec> extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(TopologyMixupTestBase.class);
+
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ }
+
+ private enum TopologyChange
+ {
+ AddNode,
+ RemoveNode,
+ HostReplace,
+ //TODO (coverage): add the following states once supported
+// StopNode,
+// StartNode,
+// MoveToken
+ //TODO (coverage): node migrate to another rack or dc (unsupported on
trunk as of this writing, but planned work for TCM)
+// MoveNodeToNewRack,
+// MoveNodeToNewDC,
+ }
+
+ private enum RemoveType
+ {Decommission, RemoveNode, Assassinate}
+
+ private static final Gen.IntGen MURMUR_TOKEN_GEN = rs ->
rs.nextInt(Integer.MIN_VALUE + 1, Integer.MAX_VALUE);
+ private static final int TARGET_RF = 3;
+ private static final Gen<Gen<RemoveType>> REMOVE_TYPE_DISTRIBUTION =
Gens.enums().allMixedDistribution(RemoveType.class);
+ private static final Gen<Map<String, Object>> CONF_GEN = new
ConfigGenBuilder()
+ // jvm-dtest hard
codes this partitioner in its APIs, so overriding will break the test
+
.withPartitionerGen(null)
+ .build();
+
+ // common commands
+ private Command<State<S>, Void, ?> repairCommand(int toCoordinate)
+ {
+ return new SimpleCommand<>(state -> "nodetool repair " +
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node"
+ toCoordinate + state.commandNamePostfix(),
+ state ->
state.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), state.schemaSpec.name()).asserts().success());
+ }
+
+ private Command<State<S>, Void, ?> waitForCMSToQuiesce()
+ {
+ return new SimpleCommand<>(state -> "Waiting for CMS to Quiesce" +
state.commandNamePostfix(),
+ state ->
ClusterUtils.waitForCMSToQuiesce(state.cluster, state.cmsGroup));
+ }
+
+ private Command<State<S>, Void, ?> stopInstance(int toRemove)
+ {
+ return new SimpleCommand<>(state -> "Stop Node" + toRemove + " for
Assassinate" + state.commandNamePostfix(),
+ state -> {
+ IInvokableInstance inst =
state.cluster.get(toRemove);
+ TopologyHistory.Node node =
state.topologyHistory.node(toRemove);
+ ClusterUtils.stopUnchecked(inst);
+ node.down();
+ });
+ }
+
+ private Command<State<S>, Void, ?> addNode()
+ {
+ return new SimpleCommand<>(state -> "Add Node" +
(state.topologyHistory.uniqueInstances + 1) + state.commandNamePostfix(),
+ state -> {
+ TopologyHistory.Node n =
state.topologyHistory.addNode();
+ IInvokableInstance newInstance =
ClusterUtils.addInstance(state.cluster, n.dc, n.rack, c ->
c.set("auto_bootstrap", true));
+ newInstance.startup(state.cluster);
+ n.up();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
+ {
+ int toRemove = rs.pickInt(state.topologyHistory.up());
+ return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
+ IInvokableInstance inst = s2.cluster.get(toRemove);
+ TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
+ node.status = TopologyHistory.Node.Status.BeingDecommissioned;
+ inst.nodetoolResult("decommission").asserts().success();
+ ClusterUtils.stopUnchecked(inst);
+ node.removed();
+ });
+ }
+
+ private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
+ {
+ int[] up = state.topologyHistory.up();
+ int toRemove = rs.pickInt(up);
+ int toCoordinate;
+ {
+ int picked;
+ do
+ {
+ picked = rs.pickInt(up);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(toRemove),
+ new SimpleCommand<>("nodetool removenode node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingRemoved;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ coordinator.nodetoolResult("removenode",
Integer.toString(toRemove), "--force").asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(toCoordinate));
+ }
+
+ private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
+ {
+ //TODO (correctness): assassinate CMS member isn't allowed
+ IntHashSet up = asSet(state.topologyHistory.up());
+ IntHashSet cmsGroup = asSet(state.cmsGroup);
+ Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
+ if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
+ List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
+ allowed.sort(Comparator.naturalOrder());
+ int toRemove = rs.pick(allowed);
+ int toCoordinate;
+ {
+ int[] upInt = state.topologyHistory.up();
+ int picked;
+ do
+ {
+ picked = rs.pickInt(upInt);
+ }
+ while (picked == toRemove);
+ toCoordinate = picked;
+ }
+ return multistep(stopInstance(toRemove),
+ new SimpleCommand<>("nodetool assassinate node" +
toRemove + " from node" + toCoordinate + state.commandNamePostfix(), s2 -> {
+ TopologyHistory.Node node =
s2.topologyHistory.node(toRemove);
+ node.status =
TopologyHistory.Node.Status.BeingAssassinated;
+ IInvokableInstance coordinator =
s2.cluster.get(toCoordinate);
+ InetSocketAddress address =
s2.cluster.get(toRemove).config().broadcastAddress();
+ coordinator.nodetoolResult("assassinate",
address.getAddress().getHostAddress() + ":" +
address.getPort()).asserts().success();
+ node.removed();
+
s2.currentEpoch.set(HackSerialization.tcmEpoch(coordinator));
+ }),
+ repairCommand(toCoordinate)
+ );
+ }
+
+ private Command<State<S>, Void, ?>
removeNodeRandomizedDispatch(RandomSource rs, State<S> state)
+ {
+ RemoveType type = state.removeTypeGen.next(rs);
+ switch (type)
+ {
+ case Decommission:
+ return removeNodeDecommission(rs, state);
+ case RemoveNode:
+ return removeNode(rs, state);
+ case Assassinate:
+ return removeNodeAssassinate(rs, state);
+ default:
+ throw new UnsupportedOperationException("Unknown remove type:
" + type);
+ }
+ }
+
+ private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
+ {
+ int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
+ TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
+ TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
+
+ return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + "
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ ClusterUtils.stopUnchecked(toReplace);
+ removing.down();
+ }),
+ new SimpleCommand<>("Host Replace Node" +
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
+ logger.info("node{} starting host replacement;
epoch={}", adding.id,
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
+ removing.status =
TopologyHistory.Node.Status.BeingReplaced;
+ IInvokableInstance inst =
ClusterUtils.replaceHostAndStart(s2.cluster, toReplace);
+ s2.topologyHistory.replaced(removing, adding);
+ long epoch = HackSerialization.tcmEpoch(inst);
+ s2.currentEpoch.set(epoch);
+ logger.info("{} completed host replacement in
epoch={}", inst, epoch);
+ }),
+ //TODO (remove after rebase to trunk):
https://issues.apache.org/jira/browse/CASSANDRA-19705 After the rebase to
trunk this is not needed. The issue is that the CMS placement removes the
node, it does not promote another node, this cases rf=3 to become rf=2
+ new SimpleCommand<>("CMS reconfigure on Node" +
adding.id + state.commandNamePostfix(), s2 ->
s2.cluster.get(adding.id).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF)).asserts().success())
+ );
+ }
+
+ protected abstract Gen<State<S>> stateGen();
+
+ protected void preCheck(Property.StatefulBuilder statefulBuilder)
+ {
+
+ }
+
+ protected void destroyState(State<S> state, @Nullable Throwable cause)
throws Throwable
+ {
+
+ }
+
+ @Test
+ public void test()
+ {
+ Property.StatefulBuilder statefulBuilder =
stateful().withSteps(20).withStepTimeout(Duration.ofMinutes(2)).withExamples(1);
+ preCheck(statefulBuilder);
+ statefulBuilder.check(commands(this::stateGen)
+ .preCommands(state ->
state.preActions.forEach(Runnable::run))
+ .add(2, (rs, state) -> {
+ EnumSet<TopologyChange>
possibleTopologyChanges = possibleTopologyChanges(state);
+ if (possibleTopologyChanges.isEmpty())
return ignoreCommand();
+ return topologyCommand(state,
possibleTopologyChanges).next(rs);
+ })
+ .add(1, (rs, state) ->
repairCommand(rs.pickInt(state.topologyHistory.up())))
+ .add(7, (rs, state) ->
state.statementGen.apply(rs, state))
+ .destroyState((state, cause) -> {
+ try (state)
+ {
+
TopologyMixupTestBase.this.destroyState(state, cause);
+ }
+ })
+ .build());
+ }
+
+ private EnumSet<TopologyChange> possibleTopologyChanges(State<S> state)
+ {
+ EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
+ // up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
+ // so up is enough to know the topology size
+ int size = state.topologyHistory.up().length;
+ if (size < state.topologyHistory.maxNodes)
+ possibleTopologyChanges.add(TopologyChange.AddNode);
+ if (size > state.topologyHistory.quorum())
+ {
+ if (size > TARGET_RF)
+ possibleTopologyChanges.add(TopologyChange.RemoveNode);
+ possibleTopologyChanges.add(TopologyChange.HostReplace);
+ }
+ return possibleTopologyChanges;
+ }
+
+ private Gen<Command<State<S>, Void, ?>> topologyCommand(State<S> state,
EnumSet<TopologyChange> possibleTopologyChanges)
+ {
+ Map<Gen<Command<State<S>, Void, ?>>, Integer> possible = new
LinkedHashMap<>();
+ for (TopologyChange task : possibleTopologyChanges)
+ {
+ switch (task)
+ {
+ case AddNode:
+ possible.put(ignore -> multistep(addNode(),
waitForCMSToQuiesce()), 1);
+ break;
+ case RemoveNode:
+ possible.put(rs ->
multistep(removeNodeRandomizedDispatch(rs, state), waitForCMSToQuiesce()), 1);
+ break;
+ case HostReplace:
+ possible.put(rs -> multistep(hostReplace(rs, state),
waitForCMSToQuiesce()), 1);
+ break;
+ default:
+ throw new UnsupportedOperationException(task.name());
+ }
+ }
+ return Gens.oneOf(possible);
+ }
+
+ private static IntHashSet asSet(int[] array)
+ {
+ IntHashSet set = new IntHashSet(array.length);
+ for (int i : array)
+ set.add(i);
+ return set;
+ }
+
+ public interface SchemaSpec
+ {
+ String name();
+
+ String keyspaceName();
+ }
+
+ protected static class State<S extends SchemaSpec> implements AutoCloseable
+ {
+ final TopologyHistory topologyHistory;
+ final Cluster cluster;
+ final S schemaSpec;
+ final List<Runnable> preActions = new CopyOnWriteArrayList<>();
+ final AtomicLong currentEpoch = new AtomicLong();
+ final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final Gen<RemoveType> removeTypeGen;
+ private final Map<String, Object> yamlConfigOverrides;
+ int[] cmsGroup = new int[0];
+
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ {
+ this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ try
+ {
+
+ this.yamlConfigOverrides = CONF_GEN.next(rs);
+ cluster = Cluster.build(topologyHistory.minNodes)
+ .withTokenSupplier(topologyHistory)
+ .withConfig(c -> {
+ c.with(Feature.values())
+ .set("write_request_timeout", "10s");
+ //TODO (maintenance): where to put this?
Anything touching ConfigGenBuilder with jvm-dtest needs this...
+ ((InstanceConfig)
c).remove("commitlog_sync_period_in_ms");
+ for (Map.Entry<String, Object> e :
yamlConfigOverrides.entrySet())
+ c.set(e.getKey(), e.getValue());
+ onConfigure(c);
+ })
+ //TODO (maintenance): should TopologyHistory
also be a INodeProvisionStrategy.Factory so address information is stored in
the Node?
+ //TODO (maintenance): AbstractCluster's
Map<Integer, NetworkTopology.DcAndRack> nodeIdTopology makes playing with
dc/rack annoying, if this becomes an interface then TopologyHistory could own
+ .withNodeProvisionStrategy((subnet, portMap)
-> new INodeProvisionStrategy.AbstractNodeProvisionStrategy(portMap)
+ {
+ {
+ Invariants.checkArgument(subnet == 0,
"Unexpected subnet detected: %d", subnet);
+ }
+
+ private final String ipPrefix = "127.0."
+ subnet + '.';
+
+ @Override
+ public int seedNodeNum()
+ {
+ int[] up = topologyHistory.up();
+ if (Arrays.equals(up, new int[]{ 1, 2
}))
+ return 1;
+ return rs.pickInt(up);
+ }
+
+ @Override
+ public String ipAddress(int nodeNum)
+ {
+ return ipPrefix + nodeNum;
+ }
+ })
+ .start();
+ }
+ catch (IOException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ fixDistributedSchemas(cluster);
+ init(cluster, TARGET_RF);
+ // fix TCM
+ {
+ NodeToolResult result = cluster.get(1).nodetoolResult("cms",
"reconfigure", "2");
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ }
+ preActions.add(new Runnable()
+ {
+ // in order to remove this action, an anonymous class is
needed so "this" works, lambda "this" is the parent class
+ @Override
+ public void run()
+ {
+ if (topologyHistory.up().length == TARGET_RF)
+ {
+ NodeToolResult result =
cluster.get(1).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF));
+ result.asserts().success();
+ logger.info("CMS reconfigure: {}", result.getStdout());
+ preActions.remove(this);
+ }
+ }
+ });
+ preActions.add(() -> {
+ int[] up = topologyHistory.up();
+ // use the most recent node just in case the cluster isn't
in-sync
+ IInvokableInstance node = cluster.get(up[up.length - 1]);
+ cmsGroup = HackSerialization.cmsGroup(node);
+ currentEpoch.set(HackSerialization.tcmEpoch(node));
+ });
+ preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
+ this.schemaSpec = schemaSpecGen.apply(rs, cluster);
+ statementGen = cqlOperationsGen.apply(schemaSpec);
+
+ removeTypeGen = REMOVE_TYPE_DISTRIBUTION.next(rs);
+
+ long waitForEpoch = HackSerialization.tcmEpoch(cluster.get(1));
+ currentEpoch.set(waitForEpoch);
+ onStartupComplete(waitForEpoch);
+ }
+
+ protected void onStartupComplete(long tcmEpoch)
+ {
+
+ }
+
+ protected void onConfigure(IInstanceConfig config)
+ {
+
+ }
+
+ protected String commandNamePostfix()
+ {
+ return "; epoch=" + currentEpoch.get() + ", cms=" +
Arrays.toString(cmsGroup);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Yaml
Config:\n").append(YamlConfigurationLoader.toYaml(this.yamlConfigOverrides));
+ sb.append("\nTopology:\n").append(topologyHistory);
+ sb.append("\nCMS Voting Group:
").append(Arrays.toString(cmsGroup));
+ return sb.toString();
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ cluster.close();
+ }
+ }
+
+ public static class TopologyHistory implements TokenSupplier
+ {
+ private final RandomSource rs;
+ private final int tokensPerNode;
+ private final int minNodes, maxNodes;
+
+ private final Int2ObjectHashMap<Node> nodes = new
Int2ObjectHashMap<>();
+ private final Set<String> activeTokens = new HashSet<>();
+ private int uniqueInstances = 0;
+ /**
+ * Tracks how many topology change events were performed
+ */
+ private int generation = 0;
+
+ public TopologyHistory(RandomSource rs, int minNodes, int maxNodes)
+ {
+ this.rs = rs;
+ this.minNodes = minNodes;
+ this.maxNodes = maxNodes;
+ this.tokensPerNode = Cluster.build(1).getTokenCount();
+ for (int i = 0; i < minNodes; i++)
+ addNode();
+ for (Node n : nodes.values())
+ n.status = Node.Status.Up;
+ }
+
+ public long generation()
+ {
+ return generation;
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 1; i <= nodes.size(); i++)
+ {
+ Node node = nodes.get(i);
+ sb.append("\n\tNode").append(i).append(":
status=").append(node.status).append(", tokens=").append(node.tokens);
+ }
+ return sb.toString();
+ }
+
+ public int quorum()
+ {
+ return (TARGET_RF / 2) + 1;
+ }
+
+ @Override
+ public Collection<String> tokens(int i)
+ {
+ Node n = nodes.get(i);
+ if (n == null)
+ throw new IllegalArgumentException("Unknown node" + i);
+ return n.tokens;
+ }
+
+ public int[] up()
+ {
+ IntArrayList up = new IntArrayList(nodes.size(), -1);
+ for (Map.Entry<Integer, Node> n : nodes.entrySet())
+ {
+ if (n.getValue().status == Node.Status.Up)
+ up.add(n.getKey());
+ }
+ int[] ints = up.toIntArray();
+ Arrays.sort(ints);
+ return ints;
+ }
+
+ public int size()
+ {
+ return nodes.size();
+ }
+
+ public Node addNode()
+ {
+ int id = ++uniqueInstances;
+ List<String> instTokens = Gens.lists(MURMUR_TOKEN_GEN
+ .filterAsInt(t ->
!activeTokens.contains(Integer.toString(t))))
+ .unique()
+ .ofSize(tokensPerNode)
+ .next(rs).stream()
+ .map(Object::toString)
+ .collect(Collectors.toList());
+ activeTokens.addAll(instTokens);
+ Node node = new Node(this, id, instTokens, "datacenter0", "rack0");
+ node.status = Node.Status.Down;
+ nodes.put(id, node);
+ return node;
+ }
+
+ public Node replace(int toReplace)
+ {
+ int id = ++uniqueInstances;
+ Node replacing = Objects.requireNonNull(nodes.get(toReplace));
+ Node node = new Node(this, id, replacing.tokens, replacing.dc,
replacing.rack);
+ node.replacing = node.id;
+ nodes.put(id, node);
+ return node;
+ }
+
+ public void replaced(Node removing, Node adding)
+ {
+ adding.status = TopologyHistory.Node.Status.Up;
+ removing.status = TopologyHistory.Node.Status.Removed;
+ adding.replacing = null;
+ generation++;
+ }
+
+ public Node node(int id)
+ {
+ if (!nodes.containsKey(id)) throw new
NoSuchElementException("Unknown node" + id);
+ return nodes.get(id);
+ }
+
+ private static class Node
+ {
+ enum Status
+ {Up, Down, BeingReplaced, BeingDecommissioned, BeingRemoved,
BeingAssassinated, Removed}
+
+ final TopologyHistory parent;
+ final int id;
+ final List<String> tokens;
+ final String dc, rack;
+ Status status = Status.Down;
+ Integer replacing = null;
+
+ private Node(TopologyHistory parent, int id, List<String> tokens,
String dc, String rack)
+ {
+ this.parent = parent;
+ this.id = id;
+ this.tokens = tokens;
+ this.dc = dc;
+ this.rack = rack;
+ }
+
+ public void up()
+ {
+ status = TopologyHistory.Node.Status.Up;
+ parent.generation++;
+ }
+
+ public void down()
+ {
+ status = TopologyHistory.Node.Status.Down;
+ parent.generation++;
+ }
+
+ public void removed()
+ {
+ status = Status.Removed;
+ parent.activeTokens.removeAll(tokens);
+ parent.generation++;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Node{" +
+ "status=" + status +
+ (replacing == null ? "" : (", replacing=" + replacing))
+
+ ", tokens=" + tokens +
+ '}';
+ }
+ }
+ }
+
+ public static class HackSerialization
+ {
+ private static long tcmEpoch(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() ->
ClusterMetadata.current().epoch.getEpoch());
+ }
+
+ private static long tcmEpochAndSync(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() ->
ClusterMetadataService.instance().log().waitForHighestConsecutive().epoch.getEpoch());
+ }
+
+ public static int[] cmsGroup(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() -> {
+ ClusterMetadata current = ClusterMetadata.current();
+ Set<InetAddressAndPort> members =
current.placements.get(ReplicationParams.meta(current)).writes.byEndpoint().keySet();
+ // Why not just use 'current.fullCMSMembers()'? That uses the
"read" replicas, so "could" have less endpoints
+ // It would be more consistent to use fullCMSMembers but
thought process is knowing the full set is better
+ // than the coordination set.
+ int[] array =
members.stream().mapToInt(HackSerialization::addressToNodeId).toArray();
+ Arrays.sort(array);
+ return array;
+ });
+ }
+
+ private static int addressToNodeId(InetAddressAndPort addressAndPort)
+ {
+ String address = addressAndPort.getAddress().getHostAddress();
+ String[] parts = address.split("\\.");
+ Invariants.checkState(parts.length == 4, "Unable to parse address
%s", address);
+ return Integer.parseInt(parts[3]);
+ }
+ }
+}
diff --git a/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
b/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
index 09cc574082..9dc0fcfaf4 100644
--- a/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/HarryHelper.java
@@ -131,7 +131,11 @@ public class HarryHelper
public static ReplayingHistoryBuilder dataGen(SystemUnderTest sut,
TokenPlacementModel.ReplicationFactor rf, SystemUnderTest.ConsistencyLevel
writeCl)
{
- long seed = 1L;
+ return dataGen(1, sut, rf, writeCl);
+ }
+
+ public static ReplayingHistoryBuilder dataGen(long seed, SystemUnderTest
sut, TokenPlacementModel.ReplicationFactor rf, SystemUnderTest.ConsistencyLevel
writeCl)
+ {
SchemaSpec schema = schemaSpecGen("harry", "tbl_").inflate(seed);
return new ReplayingHistoryBuilder(seed, 100, 1, new
DefaultDataTracker(), sut, schema, rf, writeCl);
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
index 5943b77b65..5db0ee795d 100644
---
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
+++
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.harry.core.Run;
import org.apache.cassandra.harry.ddl.SchemaSpec;
import org.apache.cassandra.harry.sut.SystemUnderTest;
@@ -118,8 +119,9 @@ public class InJVMTokenAwareVisitExecutor extends
LoggingVisitor.LoggingVisitorE
protected TokenPlacementModel.ReplicatedRanges getRing()
{
- List<TokenPlacementModel.Node> other =
peerStateToNodes(sut.cluster.coordinator(1).execute("select peer, tokens,
data_center, rack from system.peers", ConsistencyLevel.ONE));
- List<TokenPlacementModel.Node> self =
peerStateToNodes(sut.cluster.coordinator(1).execute("select broadcast_address,
tokens, data_center, rack from system.local", ConsistencyLevel.ONE));
+ ICoordinator coordinator = sut.firstAlive().coordinator();
+ List<TokenPlacementModel.Node> other =
peerStateToNodes(coordinator.execute("select peer, tokens, data_center, rack
from system.peers", ConsistencyLevel.ONE));
+ List<TokenPlacementModel.Node> self =
peerStateToNodes(coordinator.execute("select broadcast_address, tokens,
data_center, rack from system.local", ConsistencyLevel.ONE));
List<TokenPlacementModel.Node> all = new ArrayList<>();
all.addAll(self);
all.addAll(other);
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
index db119acc02..4c19e67294 100644
--- a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
+++ b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
@@ -79,7 +79,13 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER
extends ICluster<NODE>
public Integer get()
{
- return (int) (cnt.getAndIncrement() % cluster.size() + 1);
+ for (int i = 0; i < 42; i++)
+ {
+ int selected = (int) (cnt.getAndIncrement() %
cluster.size() + 1);
+ if (!cluster.get(selected).isShutdown())
+ return selected;
+ }
+ throw new IllegalStateException("Unable to find an alive
instance");
}
};
}
@@ -129,6 +135,11 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER
extends ICluster<NODE>
cluster.schemaChange(statement);
}
+ public IInstance firstAlive()
+ {
+ return cluster.stream().filter(i -> !i.isShutdown()).findFirst().get();
+ }
+
public Object[][] execute(String statement, ConsistencyLevel cl, int
pageSize, Object... bindings)
{
return execute(statement, cl, loadBalancingStrategy.get(), pageSize,
bindings);
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
index c8d24f28a1..f9ee0d0125 100644
---
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
+++
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/QuiescentLocalStateChecker.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.harry.sut.injvm;
import java.util.ArrayList;
import java.util.List;
+import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.harry.core.Run;
import org.apache.cassandra.harry.ddl.SchemaSpec;
import org.apache.cassandra.harry.model.OpSelectors;
@@ -60,8 +61,10 @@ public class QuiescentLocalStateChecker extends
QuiescentLocalStateCheckerBase
@Override
protected TokenPlacementModel.ReplicatedRanges getRing()
{
- List<TokenPlacementModel.Node> other =
TokenPlacementModel.peerStateToNodes(((InJvmSutBase<?, ?>)
sut).cluster.coordinator(1).execute("select peer, tokens, data_center, rack
from system.peers", ConsistencyLevel.ONE));
- List<TokenPlacementModel.Node> self =
TokenPlacementModel.peerStateToNodes(((InJvmSutBase<?, ?>)
sut).cluster.coordinator(1).execute("select broadcast_address, tokens,
data_center, rack from system.local", ConsistencyLevel.ONE));
+ IInstance node = ((InJvmSutBase<?, ?>) sut).firstAlive();
+ ICoordinator coordinator = node.coordinator();
+ List<TokenPlacementModel.Node> other =
TokenPlacementModel.peerStateToNodes(coordinator.execute("select peer, tokens,
data_center, rack from system.peers", ConsistencyLevel.ONE));
+ List<TokenPlacementModel.Node> self =
TokenPlacementModel.peerStateToNodes(coordinator.execute("select
broadcast_address, tokens, data_center, rack from system.local",
ConsistencyLevel.ONE));
List<TokenPlacementModel.Node> all = new ArrayList<>();
all.addAll(self);
all.addAll(other);
diff --git a/test/unit/accord/utils/Gen.java b/test/unit/accord/utils/Gen.java
index 7563e8c29b..523812ccf4 100644
--- a/test/unit/accord/utils/Gen.java
+++ b/test/unit/accord/utils/Gen.java
@@ -22,8 +22,10 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.IntSupplier;
+import java.util.function.IntUnaryOperator;
import java.util.function.LongPredicate;
import java.util.function.LongSupplier;
+import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
@@ -87,6 +89,22 @@ public interface Gen<A> {
};
}
+ default Gen<A> filter(int maxAttempts, A defaultValue, Predicate<A> fn)
+ {
+ Invariants.checkArgument(maxAttempts > 0, "Max attempts must be
positive; given %d", maxAttempts);
+ Gen<A> self = this;
+ return r -> {
+ for (int i = 0; i < maxAttempts; i++)
+ {
+ A v = self.next(r);
+ if (fn.test(v))
+ return v;
+
+ }
+ return defaultValue;
+ };
+ }
+
default Supplier<A> asSupplier(RandomSource rs)
{
return () -> next(rs);
@@ -97,6 +115,21 @@ public interface Gen<A> {
return Stream.generate(() -> next(rs));
}
+ interface Int2IntMapFunction
+ {
+ int applyAsInt(RandomSource rs, int value);
+ }
+
+ interface Int2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, int value);
+ }
+
+ interface Long2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, long value);
+ }
+
interface IntGen extends Gen<Integer>
{
int nextInt(RandomSource random);
@@ -107,7 +140,22 @@ public interface Gen<A> {
return nextInt(random);
}
- default Gen.IntGen filterInt(IntPredicate fn)
+ default IntGen mapAsInt(IntUnaryOperator fn)
+ {
+ return r -> fn.applyAsInt(nextInt(r));
+ }
+
+ default IntGen mapAsInt(Int2IntMapFunction fn)
+ {
+ return r -> fn.applyAsInt(r, nextInt(r));
+ }
+
+ default LongGen mapAsLong(Int2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextInt(r));
+ }
+
+ default Gen.IntGen filterAsInt(IntPredicate fn)
{
return rs -> {
int value;
@@ -123,7 +171,7 @@ public interface Gen<A> {
@Override
default Gen.IntGen filter(Predicate<Integer> fn)
{
- return filterInt(i -> fn.test(i));
+ return filterAsInt(i -> fn.test(i));
}
default IntSupplier asIntSupplier(RandomSource rs)
@@ -147,7 +195,17 @@ public interface Gen<A> {
return nextLong(random);
}
- default Gen.LongGen filterLong(LongPredicate fn)
+ default LongGen mapAsLong(LongUnaryOperator fn)
+ {
+ return r -> fn.applyAsLong(nextLong(r));
+ }
+
+ default LongGen mapAsLong(Long2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextLong(r));
+ }
+
+ default Gen.LongGen filterAsLong(LongPredicate fn)
{
return rs -> {
long value;
@@ -163,7 +221,7 @@ public interface Gen<A> {
@Override
default Gen.LongGen filter(Predicate<Long> fn)
{
- return filterLong(i -> fn.test(i));
+ return filterAsLong(i -> fn.test(i));
}
default LongSupplier asLongSupplier(RandomSource rs)
diff --git a/test/unit/accord/utils/Gens.java b/test/unit/accord/utils/Gens.java
index 1fdff2258b..975aee9f4f 100644
--- a/test/unit/accord/utils/Gens.java
+++ b/test/unit/accord/utils/Gens.java
@@ -19,26 +19,39 @@
package accord.utils;
import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
+import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import java.util.stream.Stream;
+import com.google.common.collect.Iterables;
public class Gens {
private Gens() {
}
+ public static <T> Gen<T> flatten(Gen<Gen<T>> gen)
+ {
+ return rs -> gen.next(rs).next(rs);
+ }
+
public static <T> Gen<T> constant(T constant)
{
return ignore -> constant;
@@ -49,6 +62,37 @@ public class Gens {
return ignore -> constant.get();
}
+ public static <T> Gen<T> oneOf(Gen<? extends T>... gens)
+ {
+ switch (gens.length)
+ {
+ case 0: throw new IllegalArgumentException("Unable to select oneOf
an empty list");
+ case 1: return (Gen<T>) gens[0];
+ }
+ return oneOf(Arrays.asList(gens));
+ }
+
+ public static <T> Gen<T> oneOf(List<Gen<? extends T>> gens)
+ {
+ switch (gens.size())
+ {
+ case 0: throw new IllegalArgumentException("Unable to select oneOf
an empty list");
+ case 1: return (Gen<T>) gens.get(0);
+ }
+ return rs -> rs.pick(gens).next(rs);
+ }
+
+ public static <T> Gen<T> oneOf(Map<Gen<T>, Integer> values)
+ {
+ Gen<Gen<T>> gen = pick(values);
+ return rs -> gen.next(rs).next(rs);
+ }
+
+ public static Gen.IntGen pickInt(int... ts)
+ {
+ return rs -> ts[rs.nextInt(0, ts.length)];
+ }
+
public static <T> Gen<T> pick(T... ts)
{
return pick(Arrays.asList(ts));
@@ -74,8 +118,20 @@ public class Gens {
{
if (values == null || values.isEmpty())
throw new IllegalArgumentException("values is empty");
+ // if 2 values have the same weight we need some way to tie-break, but
that isn't always possible...
+ // this method relies on the map having some order and will reject any
map that doesn't define a deterministic order
+ if (!(values instanceof EnumMap || values instanceof LinkedHashMap))
+ throw new IllegalArgumentException("pick(Map) requires a map with
deterministic iteration; given " + values.getClass());
+ if (values.size() == 1)
+ return
constant(Objects.requireNonNull(Iterables.getFirst(values.keySet(), null)));
double totalWeight =
values.values().stream().mapToDouble(Integer::intValue).sum();
- List<Weight<T>> list = values.entrySet().stream().map(e -> new
Weight<>(e.getKey(), e.getValue())).collect(Collectors.toList());
+ List<Weight<T>> list = new ArrayList<>(values.size());
+ Iterator<Map.Entry<T, Integer>> it = values.entrySet().iterator();
+ for (int i = 0; it.hasNext(); i++)
+ {
+ Map.Entry<T, Integer> e = it.next();
+ list.add(new Weight<>(e.getKey(), e.getValue(), i));
+ }
Collections.sort(list);
return rs -> {
double value = rs.nextDouble() * totalWeight;
@@ -89,6 +145,215 @@ public class Gens {
};
}
+ public static Gen.IntGen pickZipf(int[] array)
+ {
+ if (array == null || array.length == 0)
+ throw new IllegalArgumentException("Empty array given");
+ if (array.length == 1)
+ return ignore -> array[0];
+ BigDecimal[] weights = new BigDecimal[array.length];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+ weights[0] = base;
+ for (int i = 1; i < array.length; i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1),
RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO,
BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value =
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array[i];
+ }
+ return array[array.length - 1];
+ };
+ }
+
+ public static Gen.LongGen pickZipf(long[] array)
+ {
+ if (array == null || array.length == 0)
+ throw new IllegalArgumentException("Empty array given");
+ if (array.length == 1)
+ return ignore -> array[0];
+ BigDecimal[] weights = new BigDecimal[array.length];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+ weights[0] = base;
+ for (int i = 1; i < array.length; i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1),
RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO,
BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value =
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array[i];
+ }
+ return array[array.length - 1];
+ };
+ }
+
+ public static <T> Gen<T> pickZipf(T... array)
+ {
+ return pickZipf(Arrays.asList(array));
+ }
+
+ public static <T> Gen<T> pickZipf(List<T> array)
+ {
+ if (array == null || array.isEmpty())
+ throw new IllegalArgumentException("Empty array given");
+ if (array.size() == 1)
+ return ignore -> array.get(0);
+ BigDecimal[] weights = new BigDecimal[array.size()];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size()));
+ weights[0] = base;
+ for (int i = 1; i < array.size(); i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1),
RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO,
BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value =
BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array.get(i);
+ }
+ return array.get(array.size() - 1);
+ };
+ }
+
+ public static Gen<Gen.IntGen> mixedDistribution(int minInclusive, int
maxExclusive)
+ {
+ int domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" +
minInclusive + ", max=" + maxExclusive);
+ int[] array, indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ int delta = domainSize / numBuckets;
+ array = new int[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = IntStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 2))
+ {
+ case 0: // uniform
+ return r -> r.nextInt(minInclusive, maxExclusive);
+ case 1: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> {
+ int start = array[index];
+ int end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextInt(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static int[] reverseAndCopy(int[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid;
i++, j--)
+ {
+ int tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static Gen<Gen.LongGen> mixedDistribution(long minInclusive, long
maxExclusive)
+ {
+ long domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" +
minInclusive + ", max=" + maxExclusive);
+ long[] array;
+ int[] indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ long delta = domainSize / numBuckets;
+ array = new long[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = LongStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 2))
+ {
+ case 0: // uniform
+ return r -> r.nextLong(minInclusive, maxExclusive);
+ case 1: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ?
reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> {
+ long start = array[index];
+ long end = index == array.length - 1 ? maxExclusive :
array[index + 1];
+ return r.nextLong(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static long[] reverseAndCopy(long[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid;
i++, j--)
+ {
+ long tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(T... list)
+ {
+ return mixedDistribution(Arrays.asList(list));
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(List<T> list)
+ {
+ return rs -> {
+ switch (rs.nextInt(0, 2))
+ {
+ case 0: // uniform
+ return r -> list.get(rs.nextInt(0, list.size()));
+ case 1: // zipf
+ List<T> array = list;
+ if (rs.nextBoolean())
+ {
+ array = new ArrayList<>(list);
+ Collections.reverse(array);
+ }
+ return pickZipf(array);
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
public static Gen<char[]> charArray(Gen.IntGen sizes, char[] domain)
{
return charArray(sizes, domain, (a, b) -> true);
@@ -163,6 +428,11 @@ public class Gens {
return new StringDSL();
}
+ public static BooleanSupplier supplier(Gen<Boolean> gen, RandomSource rs)
+ {
+ return () -> gen.next(rs);
+ }
+
public static class BooleanDSL
{
public Gen<Boolean> all()
@@ -170,12 +440,12 @@ public class Gens {
return RandomSource::nextBoolean;
}
- public Gen<Boolean> runs(double ratio, int maxRuns)
+ public Gen<Boolean> biasedRepeatingRuns(double ratio, int maxRuns)
{
Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to
be larger than 0 and <= 1", ratio);
double lower = ratio * .8;
double upper = ratio * 1.2;
- return new Gen<>() {
+ return new Gen<Boolean>() {
// run represents how many consecutaive true values should be
returned; -1 implies no active "run" exists
private int run = -1;
private long falseCount = 0, trueCount = 0;
@@ -213,6 +483,28 @@ public class Gens {
}
};
}
+
+ public Gen<Gen<Boolean>> mixedDistribution()
+ {
+ return rs -> {
+ int selection = rs.nextInt(0, 4);
+ switch (selection)
+ {
+ case 0: // uniform 50/50
+ return r -> r.nextBoolean();
+ case 1: // variable frequency
+ var freq = rs.nextFloat();
+ return r -> r.decide(freq);
+ case 2: // fixed result
+ boolean result = rs.nextBoolean();
+ return ignore -> result;
+ case 3: // biased repeating runs
+ return biasedRepeatingRuns(rs.nextDouble(),
rs.nextInt(1, 100));
+ default:
+ throw new IllegalStateException("Unexpected int for
bool selection: " + selection);
+ }
+ };
+ }
}
public static class IntDSL
@@ -238,6 +530,11 @@ public class Gens {
return r -> r.nextInt(min, max);
return r -> r.nextInt(min, max + 1);
}
+
+ public Gen<Gen.IntGen> mixedDistribution(int minInclusive, int
maxExclusive)
+ {
+ return Gens.mixedDistribution(minInclusive, maxExclusive);
+ }
}
public static class LongDSL {
@@ -269,6 +566,11 @@ public class Gens {
return pick(klass.getEnumConstants());
}
+ public <T extends Enum<T>> Gen<Gen<T>> allMixedDistribution(Class<T>
klass)
+ {
+ return mixedDistribution(klass.getEnumConstants());
+ }
+
public <T extends Enum<T>> Gen<T> allWithWeights(Class<T> klass,
int... weights)
{
T[] constants = klass.getEnumConstants();
@@ -433,16 +735,7 @@ public class Gens {
int size = sizeGen.nextInt(r);
T[] list = (T[]) Array.newInstance(type, size);
for (int i = 0; i < size; i++)
- {
- try
- {
- list[i] = fn.next(r);
- }
- catch (IgnoreGenResult e)
- {
- return Arrays.copyOf(list, i);
- }
- }
+ list[i] = fn.next(r);
return list;
};
}
@@ -513,7 +806,7 @@ public class Gens {
}
}
- private interface Reset {
+ protected interface Reset {
static void tryReset(Object o)
{
if (o instanceof Reset)
@@ -612,15 +905,20 @@ public class Gens {
{
private final T value;
private final double weight;
+ private final int index;
- private Weight(T value, double weight) {
+ private Weight(T value, double weight, int index) {
this.value = value;
this.weight = weight;
+ this.index = index;
}
@Override
public int compareTo(Weight<T> o) {
- return Double.compare(weight, o.weight);
+ int rc = Double.compare(weight, o.weight);
+ if (rc == 0)
+ rc = Integer.compare(index, o.index);
+ return rc;
}
}
}
diff --git a/test/unit/accord/utils/Property.java
b/test/unit/accord/utils/Property.java
index b90472c862..e45642c1f1 100644
--- a/test/unit/accord/utils/Property.java
+++ b/test/unit/accord/utils/Property.java
@@ -18,23 +18,32 @@
package accord.utils;
+import accord.utils.async.TimeoutUtils;
+
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
-
public class Property
{
public static abstract class Common<T extends Common<T>>
{
- protected long seed = ThreadLocalRandom.current().nextLong();
+ protected long seed = SeedProvider.instance.nextSeed();
protected int examples = 1000;
protected boolean pure = true;
@@ -74,42 +83,26 @@ public class Property
public T withTimeout(Duration timeout)
{
this.timeout = timeout;
+ this.pure = false;
return (T) this;
}
protected void checkWithTimeout(Runnable fn)
{
- AsyncPromise<?> promise = new AsyncPromise<>();
- Thread t = new Thread(() -> {
- try
- {
- fn.run();
- promise.setSuccess(null);
- }
- catch (Throwable e)
- {
- promise.setFailure(e);
- }
- });
- t.setName("property with timeout");
- t.setDaemon(true);
try
{
- t.start();
- promise.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
+ TimeoutUtils.runBlocking(timeout, "property with timeout",
fn::run);
}
catch (ExecutionException e)
{
- throw new ProtocolException(propertyError(this, e.getCause()));
+ throw new PropertyError(propertyError(this, e.getCause()));
}
catch (InterruptedException e)
{
- t.interrupt();
throw new PropertyError(propertyError(this, e));
}
catch (TimeoutException e)
{
- t.interrupt();
TimeoutException override = new TimeoutException("property
test did not complete within " + this.timeout);
override.setStackTrace(new StackTraceElement[0]);
throw new PropertyError(propertyError(this, override));
@@ -167,7 +160,10 @@ public class Property
}
try
{
- return value.toString();
+ String result = value.toString();
+ if (result != null && result.length() > 100 && value instanceof
Collection)
+ result = ((Collection<?>) value).stream().map(o -> "\n\t "
+ o).collect(Collectors.joining(",", "[", "]"));
+ return result;
}
catch (Throwable t)
{
@@ -175,7 +171,7 @@ public class Property
}
}
- private static String propertyError(Common<?> input, Throwable cause,
Object... values)
+ private static StringBuilder propertyErrorCommon(Common<?> input,
Throwable cause)
{
StringBuilder sb = new StringBuilder();
// return "Seed=" + seed + "\nExamples=" + examples;
@@ -193,15 +189,35 @@ public class Property
msg = cause.getClass().getCanonicalName();
sb.append(msg).append('\n');
}
+ return sb;
+ }
+
+ private static String propertyError(Common<?> input, Throwable cause,
Object... values)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
if (values != null)
{
sb.append("Values:\n");
for (int i = 0; i < values.length; i++)
- sb.append('\t').append(i).append(" =
").append(normalizeValue(values[i])).append('\n');
+ sb.append('\t').append(i).append(" =
").append(normalizeValue(values[i])).append(": ").append(values[i] == null ?
"unknown type" : values[i].getClass().getCanonicalName()).append('\n');
}
return sb.toString();
}
+ private static String statefulPropertyError(StatefulBuilder input,
Throwable cause, Object state, List<String> history)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
+ sb.append("Steps: ").append(input.steps).append('\n');
+ sb.append("Values:\n");
+ String stateStr = state == null ? null :
state.toString().replace("\n", "\n\t\t");
+ sb.append("\tState: ").append(stateStr).append(": ").append(state ==
null ? "unknown type" : state.getClass().getCanonicalName()).append('\n');
+ sb.append("\tHistory:").append('\n');
+ int idx = 0;
+ for (var event : history)
+ sb.append("\t\t").append(++idx).append(":
").append(event).append('\n');
+ return sb.toString();
+ }
+
public interface FailingConsumer<A>
{
void accept(A value) throws Exception;
@@ -379,4 +395,523 @@ public class Property
{
return new ForBuilder();
}
+
+ public static StatefulBuilder stateful()
+ {
+ return new StatefulBuilder();
+ }
+
+ public static class StatefulBuilder extends Common<StatefulBuilder>
+ {
+ protected int steps = 1000;
+ @Nullable
+ protected Duration stepTimeout = null;
+
+ public StatefulBuilder()
+ {
+ examples = 500;
+ }
+
+ public StatefulBuilder withSteps(int steps)
+ {
+ this.steps = steps;
+ return this;
+ }
+
+ public StatefulBuilder withStepTimeout(Duration duration)
+ {
+ stepTimeout = duration;
+ return this;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <State, SystemUnderTest> void check(Commands<State,
SystemUnderTest> commands)
+ {
+ RandomSource rs = new DefaultRandom(seed);
+ for (int i = 0; i < examples; i++)
+ {
+ State state = null;
+ List<String> history = new ArrayList<>(steps);
+ try
+ {
+ checkInterrupted();
+
+ state = commands.genInitialState().next(rs);
+ SystemUnderTest sut = commands.createSut(state);
+
+ try
+ {
+ for (int j = 0; j < steps; j++)
+ {
+ Gen<Command<State, SystemUnderTest, ?>> cmdGen =
commands.commands(state);
+ Command cmd = cmdGen.next(rs);
+ for (int a = 0; cmd.checkPreconditions(state) !=
PreCheckResult.Ok && a < 42; a++)
+ {
+ if (a == 41)
+ throw new IllegalArgumentException("Unable
to find next command");
+ cmd = cmdGen.next(rs);
+ }
+ if (cmd instanceof MultistepCommand)
+ {
+ for (Command<State, SystemUnderTest, ?> sub :
((MultistepCommand<State, SystemUnderTest>) cmd))
+ {
+ history.add(sub.detailed(state));
+ process(sub, state, sut, history.size());
+ }
+ }
+ else
+ {
+ history.add(cmd.detailed(state));
+ process(cmd, state, sut, history.size());
+ }
+ }
+ commands.destroySut(sut, null);
+ commands.destroyState(state, null);
+ }
+ catch (Throwable t)
+ {
+ try
+ {
+ commands.destroySut(sut, t);
+ commands.destroyState(state, t);
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ throw t;
+ }
+ }
+ catch (Throwable t)
+ {
+ throw new PropertyError(statefulPropertyError(this, t,
state, history), t);
+ }
+ if (pure)
+ {
+ seed = rs.nextLong();
+ rs.setSeed(seed);
+ }
+ }
+ }
+
+ private <State, SystemUnderTest> void process(Command cmd, State
state, SystemUnderTest sut, int id) throws Throwable
+ {
+ if (stepTimeout == null)
+ {
+ cmd.process(state, sut);
+ return;
+ }
+ TimeoutUtils.runBlocking(stepTimeout, "Stateful Step " + id, () ->
cmd.process(state, sut));
+ }
+ }
+
+ public enum PreCheckResult { Ok, Ignore }
+ public interface Command<State, SystemUnderTest, Result>
+ {
+ default PreCheckResult checkPreconditions(State state) {return
PreCheckResult.Ok;}
+ Result apply(State state) throws Throwable;
+ Result run(SystemUnderTest sut) throws Throwable;
+ default void checkPostconditions(State state, Result expected,
+ SystemUnderTest sut, Result actual)
throws Throwable {}
+ default String detailed(State state) {return this.toString();}
+ default void process(State state, SystemUnderTest sut) throws Throwable
+ {
+ checkPostconditions(state, apply(state),
+ sut, run(sut));
+ }
+ }
+
+ public static <State, SystemUnderTest> MultistepCommand<State,
SystemUnderTest> multistep(Command<State, SystemUnderTest, ?>... cmds)
+ {
+ return multistep(Arrays.asList(cmds));
+ }
+
+ public static <State, SystemUnderTest> MultistepCommand<State,
SystemUnderTest> multistep(List<Command<State, SystemUnderTest, ?>> cmds)
+ {
+ List<Command<State, SystemUnderTest, ?>> result = new
ArrayList<>(cmds.size());
+ for (Command<State, SystemUnderTest, ?> c : cmds)
+ {
+ if (c instanceof MultistepCommand)
result.addAll(flatten((MultistepCommand<State, SystemUnderTest>) c));
+ else result.add(c);
+ }
+ return result::iterator;
+ }
+
+ private static <State, SystemUnderTest> Collection<? extends
Command<State, SystemUnderTest, ?>> flatten(MultistepCommand<State,
SystemUnderTest> mc)
+ {
+ List<Command<State, SystemUnderTest, ?>> result = new ArrayList<>();
+ for (Command<State, SystemUnderTest, ?> c : mc)
+ {
+ if (c instanceof MultistepCommand)
result.addAll(flatten((MultistepCommand<State, SystemUnderTest>) c));
+ else result.add(c);
+ }
+ return result;
+ }
+
+ public interface MultistepCommand<State, SystemUnderTest> extends
Command<State, SystemUnderTest, Object>, Iterable<Command<State,
SystemUnderTest, ?>>
+ {
+ @Override
+ default PreCheckResult checkPreconditions(State state)
+ {
+ for (Command<State, SystemUnderTest, ?> cmd : this)
+ {
+ PreCheckResult result = cmd.checkPreconditions(state);
+ if (result != PreCheckResult.Ok) return result;
+ }
+ return PreCheckResult.Ok;
+ }
+
+ @Override
+ default Object apply(State state) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default Object run(SystemUnderTest sut) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void checkPostconditions(State state, Object expected,
SystemUnderTest sut, Object actual) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default String detailed(State state)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void process(State state, SystemUnderTest sut) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static <State, SystemUnderTest, Result> Command<State,
SystemUnderTest, Result> ignoreCommand()
+ {
+ return new Command<>()
+ {
+ @Override
+ public PreCheckResult checkPreconditions(State state)
+ {
+ return PreCheckResult.Ignore;
+ }
+
+ @Override
+ public Result apply(State state) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Result run(SystemUnderTest sut) throws Throwable
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String detailed(State state)
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ public interface UnitCommand<State, SystemUnderTest> extends
Command<State, SystemUnderTest, Void>
+ {
+ void applyUnit(State state) throws Throwable;
+ void runUnit(SystemUnderTest sut) throws Throwable;
+
+ @Override
+ default Void apply(State state) throws Throwable
+ {
+ applyUnit(state);
+ return null;
+ }
+
+ @Override
+ default Void run(SystemUnderTest sut) throws Throwable
+ {
+ runUnit(sut);
+ return null;
+ }
+ }
+
+ public interface StateOnlyCommand<State> extends UnitCommand<State, Void>
+ {
+ @Override
+ default void runUnit(Void sut) throws Throwable {}
+ }
+
+ public static class SimpleCommand<State> implements StateOnlyCommand<State>
+ {
+ private final Function<State, String> name;
+ private final Consumer<State> fn;
+
+ public SimpleCommand(String name, Consumer<State> fn)
+ {
+ this.name = ignore -> name;
+ this.fn = fn;
+ }
+
+ public SimpleCommand(Function<State, String> name, Consumer<State> fn)
+ {
+ this.name = name;
+ this.fn = fn;
+ }
+
+ @Override
+ public String detailed(State state)
+ {
+ return name.apply(state);
+ }
+
+ @Override
+ public void applyUnit(State state)
+ {
+ fn.accept(state);
+ }
+ }
+
+ public interface Commands<State, SystemUnderTest>
+ {
+ Gen<State> genInitialState() throws Throwable;
+ SystemUnderTest createSut(State state) throws Throwable;
+ default void destroyState(State state, @Nullable Throwable cause)
throws Throwable {}
+ default void destroySut(SystemUnderTest sut, @Nullable Throwable
cause) throws Throwable {}
+ Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws
Throwable;
+ }
+
+ public static <State, SystemUnderTest> CommandsBuilder<State,
SystemUnderTest> commands(Supplier<Gen<State>> stateGen, Function<State,
SystemUnderTest> sutFactory)
+ {
+ return new CommandsBuilder<>(stateGen, sutFactory);
+ }
+
+ public static <State> CommandsBuilder<State, Void>
commands(Supplier<Gen<State>> stateGen)
+ {
+ return new CommandsBuilder<>(stateGen, ignore -> null);
+ }
+
+ public static class CommandsBuilder<State, SystemUnderTest>
+ {
+ public interface Setup<State, SystemUnderTest>
+ {
+ Command<State, SystemUnderTest, ?> setup(RandomSource rs, State
state);
+ }
+ private final Supplier<Gen<State>> stateGen;
+ private final Function<State, SystemUnderTest> sutFactory;
+ private final Map<Setup<State, SystemUnderTest>, Integer> knownWeights
= new LinkedHashMap<>();
+ @Nullable
+ private Set<Setup<State, SystemUnderTest>> unknownWeights = null;
+ private Gen.IntGen unknownWeightGen = Gens.ints().between(1, 10);
+ @Nullable
+ private FailingConsumer<State> preCommands = null;
+ @Nullable
+ private FailingBiConsumer<State, Throwable> destroyState = null;
+ @Nullable
+ private FailingBiConsumer<SystemUnderTest, Throwable> destroySut =
null;
+
+ public CommandsBuilder(Supplier<Gen<State>> stateGen, Function<State,
SystemUnderTest> sutFactory)
+ {
+ this.stateGen = stateGen;
+ this.sutFactory = sutFactory;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
preCommands(FailingConsumer<State> preCommands)
+ {
+ this.preCommands = preCommands;
+ return this;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
destroyState(FailingConsumer<State> destroyState)
+ {
+ return destroyState((success, failure) -> {
+ if (failure == null)
+ destroyState.accept(success);
+ });
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
destroyState(FailingBiConsumer<State, Throwable> destroyState)
+ {
+ this.destroyState = destroyState;
+ return this;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
destroySut(FailingConsumer<SystemUnderTest> destroySut)
+ {
+ return destroySut((success, failure) -> {
+ if (failure == null)
+ destroySut.accept(success);
+ });
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
destroySut(FailingBiConsumer<SystemUnderTest, Throwable> destroySut)
+ {
+ this.destroySut = destroySut;
+ return this;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(int weight,
Command<State, SystemUnderTest, ?> cmd)
+ {
+ return add(weight, (i1, i2) -> cmd);
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(int weight,
Gen<Command<State, SystemUnderTest, ?>> cmd)
+ {
+ return add(weight, (rs, state) -> cmd.next(rs));
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(int weight,
Setup<State, SystemUnderTest> cmd)
+ {
+ knownWeights.put(cmd, weight);
+ return this;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(Command<State,
SystemUnderTest, ?> cmd)
+ {
+ return add((i1, i2) -> cmd);
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(Gen<Command<State,
SystemUnderTest, ?>> cmd)
+ {
+ return add((rs, state) -> cmd.next(rs));
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd)
+ {
+ if (unknownWeights == null)
+ unknownWeights = new LinkedHashSet<>();
+ unknownWeights.add(cmd);
+ return this;
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Gen<Command<State, SystemUnderTest, ?>> cmd)
+ {
+ return add((rs, state) -> {
+ if (!predicate.test(state)) return ignoreCommand();
+ return cmd.next(rs);
+ });
+ }
+
+ public CommandsBuilder<State, SystemUnderTest> addIf(Predicate<State>
predicate, Setup<State, SystemUnderTest> cmd)
+ {
+ return add((rs, state) -> {
+ if (!predicate.test(state)) return ignoreCommand();
+ return cmd.setup(rs, state);
+ });
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
addAllIf(Predicate<State> predicate, Consumer<IfBuilder<State,
SystemUnderTest>> sub)
+ {
+ sub.accept(new IfBuilder<>()
+ {
+ @Override
+ public IfBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd)
+ {
+ CommandsBuilder.this.addIf(predicate, cmd);
+ return this;
+ }
+ });
+ return this;
+ }
+
+ public interface IfBuilder<State, SystemUnderTest>
+ {
+ IfBuilder<State, SystemUnderTest> add(Setup<State,
SystemUnderTest> cmd);
+ }
+
+ public CommandsBuilder<State, SystemUnderTest>
unknownWeight(Gen.IntGen unknownWeightGen)
+ {
+ this.unknownWeightGen = Objects.requireNonNull(unknownWeightGen);
+ return this;
+ }
+
+ public Commands<State, SystemUnderTest> build()
+ {
+ Gen<Setup<State, SystemUnderTest>> commandsGen;
+ if (unknownWeights == null)
+ {
+ commandsGen = Gens.pick(new LinkedHashMap<>(knownWeights));
+ }
+ else
+ {
+ class DynamicWeightsGen implements Gen<Setup<State,
SystemUnderTest>>, Gens.Reset
+ {
+ Gen<Setup<State, SystemUnderTest>> gen;
+ @Override
+ public Setup<State, SystemUnderTest> next(RandomSource rs)
+ {
+ if (gen == null)
+ {
+ // create random weights
+ LinkedHashMap<Setup<State, SystemUnderTest>,
Integer> clone = new LinkedHashMap<>(knownWeights);
+ for (Setup<State, SystemUnderTest> s :
unknownWeights)
+ clone.put(s, unknownWeightGen.nextInt(rs));
+ gen = Gens.pick(clone);
+ }
+ return gen.next(rs);
+ }
+
+ @Override
+ public void reset()
+ {
+ gen = null;
+ }
+ }
+ commandsGen = new DynamicWeightsGen();
+ }
+ return new Commands<>()
+ {
+ @Override
+ public Gen<State> genInitialState() throws Throwable
+ {
+ return stateGen.get();
+ }
+
+ @Override
+ public SystemUnderTest createSut(State state) throws Throwable
+ {
+ return sutFactory.apply(state);
+ }
+
+ @Override
+ public Gen<Command<State, SystemUnderTest, ?>> commands(State
state) throws Throwable
+ {
+ if (preCommands != null)
+ preCommands.accept(state);
+ return commandsGen.map((rs, setup) -> setup.setup(rs,
state));
+ }
+
+ @Override
+ public void destroyState(State state, @Nullable Throwable
cause) throws Throwable
+ {
+ Gens.Reset.tryReset(commandsGen);
+ if (destroyState != null)
+ destroyState.accept(state, cause);
+ }
+
+ @Override
+ public void destroySut(SystemUnderTest sut, @Nullable
Throwable cause) throws Throwable
+ {
+ if (destroySut != null)
+ destroySut.accept(sut, cause);
+ }
+ };
+ }
+
+ public interface FailingConsumer<T>
+ {
+ void accept(T value) throws Throwable;
+ }
+
+ public interface FailingBiConsumer<A, B>
+ {
+ void accept(A a, B b) throws Throwable;
+ }
+ }
}
diff --git a/test/unit/accord/utils/SeedProvider.java
b/test/unit/accord/utils/SeedProvider.java
new file mode 100644
index 0000000000..9c7858dafe
--- /dev/null
+++ b/test/unit/accord/utils/SeedProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utility class for creating seeds. This class mostly matches the semantics
of {@link java.util.Random} but makes the logic work
+ * for any random source. This class should be used in replacement of most
seed methods, and should always replace {@link
java.util.concurrent.ThreadLocalRandom}
+ * as that randomness will have a bias twords the same seed after a restart
(if you rerun randomized tests by restarting
+ * the JVM you will run with the same seed over and over again).
+ */
+public class SeedProvider
+{
+ public static final SeedProvider instance = new SeedProvider();
+ private final AtomicLong seedUniquifier = new
AtomicLong(8682522807148012L);
+
+ private long seedUniquifier()
+ {
+ // L'Ecuyer, "Tables of Linear Congruential Generators of
+ // Different Sizes and Good Lattice Structure", 1999
+ for (; ; )
+ {
+ long current = seedUniquifier.get();
+ long next = current * 1181783497276652981L;
+ if (seedUniquifier.compareAndSet(current, next))
+ return next;
+ }
+ }
+
+ public long nextSeed()
+ {
+ return seedUniquifier() ^ System.nanoTime();
+ }
+}
diff --git a/test/unit/accord/utils/async/TimeoutUtils.java
b/test/unit/accord/utils/async/TimeoutUtils.java
new file mode 100644
index 0000000000..f12c0b17e9
--- /dev/null
+++ b/test/unit/accord/utils/async/TimeoutUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils.async;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+
+public class TimeoutUtils
+{
+ public interface FailingRunnable
+ {
+ void run() throws Throwable;
+ }
+
+ public static void runBlocking(Duration timeout, String threadName,
FailingRunnable fn) throws ExecutionException, InterruptedException,
TimeoutException
+ {
+ // MAINTENANCE: Once the accord branch merges to trunk this can be
dropped and will be AsyncChain again, but since this is forked into C* (that
doesn't have AsyncChain) need to use Futures
+// AsyncResult.Settable<?> promise = AsyncResults.settable();
+ AsyncPromise<?> promise = new AsyncPromise<>();
+ Thread t = new Thread(() -> {
+ try
+ {
+ fn.run();
+ promise.setSuccess(null);
+ }
+ catch (Throwable e)
+ {
+ promise.setFailure(e);
+ }
+ });
+ t.setName(threadName);
+ t.setDaemon(true);
+ t.start();
+ try
+ {
+// AsyncChains.getBlocking(promise, timeout.toNanos(),
TimeUnit.NANOSECONDS);
+ promise.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ t.interrupt();
+ throw e;
+ }
+ catch (TimeoutException e)
+ {
+ t.interrupt();
+ throw e;
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
index e206af390b..01540499f4 100644
--- a/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
+++ b/test/unit/org/apache/cassandra/concurrent/SimulatedExecutorFactory.java
@@ -413,7 +413,7 @@ public class SimulatedExecutorFactory implements
ExecutorFactory, Clock
long max = TimeUnit.MILLISECONDS.toNanos(5);
LongSupplier small = () -> rs.nextLong(0, maxSmall);
LongSupplier large = () -> rs.nextLong(maxSmall, max);
- this.jitterNanos = Gens.bools().runs(rs.nextInt(1, 11) / 100.0D,
rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() :
small.getAsLong()).asLongSupplier(rs);
+ this.jitterNanos = Gens.bools().biasedRepeatingRuns(rs.nextInt(1,
11) / 100.0D, rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() :
small.getAsLong()).asLongSupplier(rs);
}
@Override
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 236480cea5..6d67c05ced 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -3070,7 +3070,7 @@ public abstract class CQLTester
Map<String, Object> config = CONFIG_GEN.next(RANDOM);
CONFIG = YamlConfigurationLoader.toYaml(config);
- Config c =
ConfigGenBuilder.santize(DatabaseDescriptor.loadConfig());
+ Config c =
ConfigGenBuilder.sanitize(DatabaseDescriptor.loadConfig());
YamlConfigurationLoader.updateFromMap(config, true, c);
DatabaseDescriptor.unsafeDaemonInitialization(() -> c);
diff --git a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
index f36d04d8a5..f0f34888a2 100644
--- a/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
+++ b/test/unit/org/apache/cassandra/net/SimulatedMessageDelivery.java
@@ -98,7 +98,7 @@ public class SimulatedMessageDelivery implements
MessageDelivery
long max = TimeUnit.SECONDS.toNanos(5);
LongSupplier small = () -> rs.nextLong(min, maxSmall);
LongSupplier large = () -> rs.nextLong(maxSmall, max);
- return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D,
rs.nextInt(3, 15))
+ return Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) /
100.0D, rs.nextInt(3, 15))
.mapToLong(b -> b ? large.getAsLong() :
small.getAsLong())
.asLongSupplier(rs.fork());
}).getAsLong();
diff --git a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
index 20a0e3d57f..b4aa22930f 100644
--- a/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
+++ b/test/unit/org/apache/cassandra/repair/FuzzTestBase.java
@@ -970,13 +970,13 @@ public abstract class FuzzTestBase extends
CQLTester.InMemory
long max = TimeUnit.SECONDS.toNanos(5);
LongSupplier small = () -> rs.nextLong(min, maxSmall);
LongSupplier large = () -> rs.nextLong(maxSmall, max);
- return Gens.bools().runs(rs.nextInt(1, 11) / 100.0D,
rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() :
small.getAsLong()).asLongSupplier(rs);
+ return Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11)
/ 100.0D, rs.nextInt(3, 15)).mapToLong(b -> b ? large.getAsLong() :
small.getAsLong()).asLongSupplier(rs);
}).getAsLong();
}
private boolean networkDrops(InetAddressAndPort to)
{
- return networkDrops.computeIfAbsent(new
Connection(broadcastAddressAndPort, to), ignore ->
Gens.bools().runs(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3,
15)).asSupplier(rs)).get();
+ return networkDrops.computeIfAbsent(new
Connection(broadcastAddressAndPort, to), ignore ->
Gens.bools().biasedRepeatingRuns(rs.nextInt(1, 11) / 100.0D, rs.nextInt(3,
15)).asSupplier(rs)).get();
}
@Override
diff --git a/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
b/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
index 6f814f809a..e0b89e767c 100644
--- a/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
+++ b/test/unit/org/apache/cassandra/utils/ConfigGenBuilder.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
import com.google.common.collect.ImmutableMap;
import accord.utils.Gen;
@@ -37,6 +39,7 @@ public class ConfigGenBuilder
public enum Memtable
{SkipListMemtable, TrieMemtable, ShardedSkipListMemtable}
+ @Nullable
Gen<IPartitioner> partitionerGen =
Generators.toGen(CassandraGenerators.nonLocalPartitioners());
Gen<Config.DiskAccessMode> commitLogDiskAccessModeGen =
Gens.enums().all(Config.DiskAccessMode.class)
.filter(m -> m
!= Config.DiskAccessMode.standard
@@ -70,7 +73,7 @@ public class ConfigGenBuilder
/**
* When loading the {@link Config} from a yaml its possible that some
configs set will conflict with the configs that get generated here, to avoid
that set them to a good default
*/
- public static Config santize(Config config)
+ public static Config sanitize(Config config)
{
Config defaults = new Config();
config.commitlog_sync = defaults.commitlog_sync;
@@ -85,6 +88,12 @@ public class ConfigGenBuilder
return this;
}
+ public ConfigGenBuilder withPartitionerGen(@Nullable Gen<IPartitioner> gen)
+ {
+ this.partitionerGen = gen;
+ return this;
+ }
+
public ConfigGenBuilder withCommitLogSync(Config.CommitLogSync
commitLogSync)
{
this.commitLogSyncGen = ignore -> commitLogSync;
@@ -114,6 +123,7 @@ public class ConfigGenBuilder
private void updateConfigPartitioner(RandomSource rs, Map<String, Object>
config)
{
+ if (partitionerGen == null) return;;
IPartitioner partitioner = partitionerGen.next(rs);
config.put("partitioner", partitioner.getClass().getSimpleName());
}
diff --git a/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
b/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
index 99df56c46b..0c2aaf29ad 100644
--- a/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
+++ b/test/unit/org/apache/cassandra/utils/ConfigGenBuilderTest.java
@@ -62,7 +62,7 @@ public class ConfigGenBuilderTest
private static Config defaultConfig()
{
- return ConfigGenBuilder.santize(DatabaseDescriptor.loadConfig());
+ return ConfigGenBuilder.sanitize(DatabaseDescriptor.loadConfig());
}
private static Config simpleConfig()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]