This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new e501f3a9d2 TopologyMixupTestBase does not fix replication factor for
Keyspaces after reaching rf=3
e501f3a9d2 is described below
commit e501f3a9d26b60f70db4f28de73d536845631dd1
Author: David Capwell <[email protected]>
AuthorDate: Wed Oct 2 14:54:29 2024 -0700
TopologyMixupTestBase does not fix replication factor for Keyspaces after
reaching rf=3
patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19975
---
modules/accord | 2 +-
.../cql3/statements/TransactionStatement.java | 1 -
.../apache/cassandra/schema/DistributedSchema.java | 6 +
.../BeginConsensusMigrationForTableAndRange.java | 2 +-
...beFinishConsensusMigrationForTableAndRange.java | 2 +-
.../fuzz/topology/AccordTopologyMixupTest.java | 5 +-
.../fuzz/topology/HarryTopologyMixupTest.java | 3 +-
.../fuzz/topology/TopologyMixupTestBase.java | 192 ++++++++++++++++-----
.../cassandra/harry/sut/TokenPlacementModel.java | 21 ++-
.../sut/injvm/InJVMTokenAwareVisitExecutor.java | 8 +-
.../org/apache/cassandra/utils/ASTGenerators.java | 13 +-
11 files changed, 195 insertions(+), 60 deletions(-)
diff --git a/modules/accord b/modules/accord
index d914ee6981..25f23ffec4 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d914ee69816ebfdf88b2120ff1d8e0bc16edecbc
+Subproject commit 25f23ffec439a921387ca249908798b9cc7d4620
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 1a6b3ee782..63bbb245c9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -91,7 +91,6 @@ import static
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
import static
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
import static org.apache.cassandra.service.accord.txn.TxnRead.createTxnRead;
import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
-import static org.apache.cassandra.service.accord.txn.TxnResult.Kind.values;
public class TransactionStatement implements
CQLStatement.CompositeCQLStatement, CQLStatement.ReturningCQLStatement
{
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index c0f3a914ba..d2c3ff167a 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -131,6 +131,12 @@ public class DistributedSchema implements
MetadataValue<DistributedSchema>
return tables.get(id);
}
+ public TableMetadata getTableMetadata(String keyspace, String cf)
+ {
+ var ks = keyspaces.getNullable(keyspace);
+ return ks == null ? null : ks.tables.getNullable(cf);
+ }
+
public static DistributedSchema fromSystemTables(Keyspaces keyspaces,
Set<String> knownDatacenters)
{
if
(!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME))
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
index e74a18c81f..8bbdbc8e64 100644
---
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
+++
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
@@ -83,7 +83,7 @@ public class BeginConsensusMigrationForTableAndRange
implements Transformation
public Result execute(ClusterMetadata prev)
{
Transformer transformer = prev.transformer();
- Collection<TableMetadata> metadata =
tables.stream().map(Schema.instance::getTableMetadata).collect(Collectors.toList());
+ Collection<TableMetadata> metadata =
tables.stream().map(prev.schema::getTableMetadata).collect(Collectors.toList());
ConsensusMigrationState consensusMigrationState =
prev.consensusMigrationState.withRangesMigrating(metadata, ranges, false);
return
Transformation.success(transformer.with(consensusMigrationState),
LockedRanges.AffectedRanges.EMPTY);
}
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
index 15bfdda65c..3f857cce09 100644
---
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
+++
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
@@ -134,7 +134,7 @@ public class MaybeFinishConsensusMigrationForTableAndRange
implements Transforma
logger.info("Completed repair eligibiliy '{}' paxos repaired ranges
{}, accord repaired ranges {}", repairType, paxosRepairedRanges,
accordBarrieredRanges);
checkNotNull(metadata, "clusterMetadata should not be null");
String ksAndCF = keyspace + "." + cf;
- TableMetadata tbm = Schema.instance.getTableMetadata(keyspace, cf);
+ TableMetadata tbm = metadata.schema.getTableMetadata(keyspace, cf);
if (tbm == null)
return new Rejected(INVALID, format("Table %s is not currently
performing consensus migration", ksAndCF));
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index 991dbfd346..a8e6d67735 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -121,10 +120,10 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
return new Spec(mode, enableMigration, metadata);
}
- private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>,
Void, ?>> cqlOperations(Spec spec)
+ private static CommandGen<Spec> cqlOperations(Spec spec)
{
Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.SelectGenBuilder(spec.metadata).withLimit1().build());
- Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().build());
+ Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().withoutTtl().build());
Gen<Statement> txn = (Gen<Statement>) (Gen<?>) fromQT(new
ASTGenerators.TxnGenBuilder(spec.metadata).build());
Map<Gen<Statement>, Integer> operations = new LinkedHashMap<>();
operations.put(select, 1);
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
index 9fe1039e99..d8bccececd 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.fuzz.topology;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -80,7 +79,7 @@ public class HarryTopologyMixupTest extends
TopologyMixupTestBase<HarryTopologyM
return new Spec(harry);
}
- private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>,
Void, ?>> cqlOperations(Spec spec)
+ private static CommandGen<Spec> cqlOperations(Spec spec)
{
class HarryCommand extends SimpleCommand<State<Spec>>
{
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index 834d2a9b81..82d6d41db3 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.fuzz.topology;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
@@ -25,7 +26,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -39,9 +39,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.harry.sut.TokenPlacementModel.Range;
+import org.apache.cassandra.harry.sut.TokenPlacementModel.Replica;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,9 +58,6 @@ import accord.utils.Property;
import accord.utils.Property.Command;
import accord.utils.Property.SimpleCommand;
import accord.utils.RandomSource;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntArrayList;
-import org.agrona.collections.IntHashSet;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.distributed.Cluster;
@@ -68,6 +70,8 @@ import
org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
import org.apache.cassandra.distributed.impl.InstanceConfig;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJVMTokenAwareVisitExecutor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.TCMMetrics;
import org.apache.cassandra.schema.ReplicationParams;
@@ -131,6 +135,22 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
state ->
state.cluster.get(toCoordinate).nodetoolResult("repair",
state.schemaSpec.keyspaceName(), state.schemaSpec.name(),
"--force").asserts().success());
}
+ private static <S extends TopologyMixupTestBase.SchemaSpec>
Command<State<S>, Void, ?> repairCommand(int toCoordinate, String ks, String...
tables) {
+ return new SimpleCommand<>(state -> "nodetool repair " + ks +
(tables.length == 0 ? "" : " " + Arrays.asList(tables)) + " from node" +
toCoordinate + state.commandNamePostfix(),
+ state -> {
+ if (tables.length == 0) {
+
state.cluster.get(toCoordinate).nodetoolResult("repair", ks,
"--force").asserts().success();
+ return;
+ }
+ List<String> args = new ArrayList<>(3 + tables.length);
+ args.add("repair");
+ args.add(ks);
+ args.addAll(Arrays.asList(tables));
+ args.add("--force");
+
state.cluster.get(toCoordinate).nodetoolResult(args.toArray(String[]::new)).asserts().success();
+ });
+ }
+
private Command<State<S>, Void, ?> waitForCMSToQuiesce()
{
return new SimpleCommand<>(state -> "Waiting for CMS to Quiesce" +
state.commandNamePostfix(),
@@ -139,7 +159,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
private Command<State<S>, Void, ?> stopInstance(RandomSource rs, State<S>
state)
{
- int toStop = rs.pickInt(state.topologyHistory.up());
+ int toStop = rs.pickInt(state.upAndSafe());
return stopInstance(toStop, "Normal Stop");
}
@@ -184,7 +204,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs,
State<S> state)
{
- int toRemove = rs.pickInt(state.topologyHistory.up());
+ int toRemove = rs.pickInt(state.upAndSafe());
return new SimpleCommand<>("nodetool decommission node" + toRemove +
state.commandNamePostfix(), s2 -> {
IInvokableInstance inst = s2.cluster.get(toRemove);
TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
@@ -198,7 +218,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S>
state)
{
int[] up = state.topologyHistory.up();
- int toRemove = rs.pickInt(up);
+ int toRemove = rs.pickInt(state.upAndSafe());
int toCoordinate;
{
int picked;
@@ -223,14 +243,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs,
State<S> state)
{
- //TODO (correctness): assassinate CMS member isn't allowed
- IntHashSet up = asSet(state.topologyHistory.up());
- IntHashSet cmsGroup = asSet(state.cmsGroup);
- Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
- if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a
CMS member");
- List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
- allowed.sort(Comparator.naturalOrder());
- int toRemove = rs.pick(allowed);
+ int toRemove = rs.pickInt(state.upAndSafe());
int toCoordinate;
{
int[] upInt = state.topologyHistory.up();
@@ -274,7 +287,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S>
state)
{
- int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+ int nodeToReplace = rs.pickInt(state.upAndSafe());
IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
TopologyHistory.Node adding =
state.topologyHistory.replace(nodeToReplace);
TopologyHistory.Node removing =
state.topologyHistory.nodes.get(nodeToReplace);
@@ -326,6 +339,12 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
TopologyMixupTestBase.this.destroyState(state, cause);
}
})
+ .commandsTransformer((state, gen) -> {
+ for (BiFunction<State<S>,
Gen<Command<State<S>, Void, ?>>, Gen<Command<State<S>, Void, ?>>> fn :
state.commandsTransformers)
+ gen = fn.apply(state, gen);
+ return gen;
+ })
+ .onSuccess((state, sut, history) ->
logger.info("Successful for the following:\nState {}\nHistory:\n{}", state,
Property.formatList("\t\t", history)))
.build());
}
@@ -336,12 +355,14 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
// so up is enough to know the topology size
int up = state.topologyHistory.up().length;
int down = state.topologyHistory.down().length;
+ int[] upAndSafe = state.upAndSafe();
int total = up + down;
if (total < state.topologyHistory.maxNodes)
possibleTopologyChanges.add(TopologyChange.AddNode);
- if (up > state.topologyHistory.quorum())
+ if (upAndSafe.length > 0)
{
- if (up > TARGET_RF)
+ // can't remove the node if all nodes are CMS nodes
+ if (!Sets.difference(asSet(upAndSafe),
asSet(state.cmsGroup)).isEmpty())
possibleTopologyChanges.add(TopologyChange.RemoveNode);
possibleTopologyChanges.add(TopologyChange.HostReplace);
possibleTopologyChanges.add(TopologyChange.StopNode);
@@ -395,21 +416,30 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
String keyspaceName();
}
+ protected interface CommandGen<S extends TopologyMixupTestBase.SchemaSpec>
+ {
+ Command<State<S>, Void, ?> apply(RandomSource rs, State<S> state);
+ }
+
protected static class State<S extends SchemaSpec> implements AutoCloseable
{
final TopologyHistory topologyHistory;
final Cluster cluster;
final S schemaSpec;
+ final List<BiFunction<State<S>, Gen<Command<State<S>, Void, ?>>,
Gen<Command<State<S>, Void, ?>>>> commandsTransformers = new ArrayList<>();
final List<Runnable> preActions = new CopyOnWriteArrayList<>();
final AtomicLong currentEpoch = new AtomicLong();
- final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>>
statementGen;
+ final CommandGen<S> statementGen;
final Gen<RemoveType> removeTypeGen;
private final Map<String, Object> yamlConfigOverrides;
int[] cmsGroup = new int[0];
+ private TokenPlacementModel.ReplicationFactor rf;
+ TokenPlacementModel.ReplicatedRanges ring = null;
- public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>,
Void, ?>>> cqlOperationsGen)
+ public State(RandomSource rs, BiFunction<RandomSource, Cluster, S>
schemaSpecGen, Function<S, CommandGen<S>> cqlOperationsGen)
{
this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+ rf = new TokenPlacementModel.SimpleReplicationFactor(2);
try
{
@@ -464,19 +494,46 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
result.asserts().success();
logger.info("CMS reconfigure: {}", result.getStdout());
}
- preActions.add(new Runnable()
- {
+ commandsTransformers.add(new BiFunction<State<S>,
Gen<Command<State<S>, Void, ?>>, Gen<Command<State<S>, Void, ?>>>() {
// in order to remove this action, an anonymous class is
needed so "this" works, lambda "this" is the parent class
@Override
- public void run()
- {
- if (topologyHistory.up().length == TARGET_RF)
- {
+ public Gen<Command<State<S>, Void, ?>> apply(State<S> state,
Gen<Command<State<S>, Void, ?>> commandGen) {
+ if (topologyHistory.up().length < TARGET_RF)
+ return commandGen;
+ SimpleCommand<State<S>> reconfig = new
SimpleCommand<>("nodetool cms reconfigure " + TARGET_RF, ignore -> {
NodeToolResult result =
cluster.get(1).nodetoolResult("cms", "reconfigure",
Integer.toString(TARGET_RF));
result.asserts().success();
logger.info("CMS reconfigure: {}", result.getStdout());
- preActions.remove(this);
- }
+ });
+ SimpleCommand<State<S>> fixDistributedSchemas = new
SimpleCommand<>("Set system distributed keyspaces to RF=" + TARGET_RF, ignore ->
+ fixDistributedSchemas(cluster));
+ SimpleCommand<State<S>> fixTestKeyspace = new
SimpleCommand<>("Set " + KEYSPACE + " keyspace to RF=" + TARGET_RF, s -> {
+ cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + "
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " +
TARGET_RF + "}");
+ rf = new
TokenPlacementModel.SimpleReplicationFactor(TARGET_RF);
+ });
+ var self = this;
+ return rs -> {
+ Command<State<S>, Void, ?> next = commandGen.next(rs);
+ if (next.checkPreconditions(state) ==
Property.PreCheckResult.Ignore)
+ return next;
+ commandsTransformers.remove(self);
+ int[] up = state.topologyHistory.up();
+ List<Command<State<S>, Void, ?>> commands = new
ArrayList<>();
+ commands.add(fixDistributedSchemas);
+ for (String ks : Arrays.asList("system_auth",
"system_traces"))
+ {
+ int coordinator = rs.pickInt(up);
+ commands.add(repairCommand(coordinator, ks));
+ }
+ commands.add(fixTestKeyspace);
+ {
+ int coordinator = rs.pickInt(up);
+ commands.add(repairCommand(coordinator, KEYSPACE));
+ }
+ commands.add(reconfig);
+ commands.add(next);
+ return multistep(commands);
+ };
}
});
preActions.add(() -> {
@@ -485,6 +542,8 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
IInvokableInstance node = cluster.get(up[up.length - 1]);
cmsGroup = HackSerialization.cmsGroup(node);
currentEpoch.set(HackSerialization.tcmEpoch(node));
+
+ ring =
InJVMTokenAwareVisitExecutor.getRing(cluster.coordinator(up[0]), rf);
});
preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
this.schemaSpec = schemaSpecGen.apply(rs, cluster);
@@ -509,7 +568,49 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
protected String commandNamePostfix()
{
- return "; epoch=" + currentEpoch.get() + ", cms=" +
Arrays.toString(cmsGroup);
+ return "; epoch=" + currentEpoch.get() + ", cms=" +
Arrays.toString(cmsGroup) + ", up=" + Arrays.toString(topologyHistory.up()) +
", down=" + Arrays.toString(topologyHistory.down());
+ }
+
+ public int[] upAndSafe()
+ {
+ IntHashSet up = asSet(topologyHistory.up());
+ int quorum = topologyHistory.quorum();
+ // find what ranges are able to handle 1 node loss
+ Set<Range> safeRanges = new HashSet<>();
+ Int2ObjectHashMap<Replica> idToReplica = new Int2ObjectHashMap<>();
+ for (Map.Entry<Range, List<Replica>> e : ring.asMap().entrySet())
+ {
+ IntHashSet alive = new IntHashSet();
+ for (var replica : e.getValue())
+ {
+ //TODO (fix test api): NodeId is in the API but is always
null. Cheapest way to get the id is to assume the address has it
+ // same issue with address...
+ // /127.0.0.2
+ String harryId = replica.node().id();
+ int index = harryId.lastIndexOf('.');
+ int peer = Integer.parseInt(harryId.substring(index + 1));
+ idToReplica.put(peer, replica);
+ if (up.contains(peer))
+ alive.add(peer);
+ }
+ if (quorum < alive.size())
+ safeRanges.add(e.getKey());
+ }
+
+ // filter nodes where 100% of their ranges are "safe"
+ IntArrayList safeNodes = new IntArrayList();
+ for (int id : up)
+ {
+ Replica replica = idToReplica.get(id);
+ List<Range> ranges = ring.ranges(replica);
+
+ if (ranges.stream().allMatch(safeRanges::contains))
+ safeNodes.add(id);
+ }
+
+ int[] upAndSafe = safeNodes.toIntArray();
+ Arrays.sort(upAndSafe);
+ return upAndSafe;
}
@Override
@@ -529,17 +630,26 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
@Override
public void close() throws Exception
{
- epochHistory = cluster.get(cmsGroup[0]).callOnInstance(() -> {
- LogState all = ClusterMetadataService.instance()
- .processor()
- .getLogState(Epoch.EMPTY,
Epoch.create(Long.MAX_VALUE), false,
-
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
-
TCMMetrics.instance.commitRetries));
- StringBuilder sb = new StringBuilder("Epochs:");
- for (Entry e : all.entries)
- sb.append("\n").append(e.epoch.getEpoch()).append(":
").append(e.transform);
- return sb.toString();
- });
+ var cmsNodesUp = Sets.intersection(asSet(cmsGroup),
asSet(topologyHistory.up()));
+ int cmsNode = Iterables.getFirst(cmsNodesUp, null);
+ try
+ {
+ epochHistory = cluster.get(cmsNode).callOnInstance(() -> {
+ LogState all = ClusterMetadataService.instance()
+ .processor()
+
.getLogState(Epoch.EMPTY, Epoch.create(Long.MAX_VALUE), false,
+
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
+
TCMMetrics.instance.commitRetries));
+ StringBuilder sb = new StringBuilder("Epochs:");
+ for (Entry e : all.entries)
+
sb.append("\n\t\t").append(e.epoch.getEpoch()).append(": ").append(e.transform);
+ return sb.toString();
+ });
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Unable to fetch epoch history on node{}",
cmsNode, t);
+ }
cluster.close();
}
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
b/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
index a25d1ffc6e..8c4c953283 100644
--- a/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
@@ -95,11 +95,17 @@ public class TokenPlacementModel
{
public final Range[] ranges;
public final NavigableMap<Range, List<Replica>> placementsForRange;
+ private final Map<Replica, List<Range>> replicaToRanges = new
HashMap<>();
public ReplicatedRanges(Range[] ranges, NavigableMap<Range,
List<Replica>> placementsForRange)
{
this.ranges = ranges;
this.placementsForRange = placementsForRange;
+ for (Map.Entry<Range, List<Replica>> e :
placementsForRange.entrySet())
+ {
+ for (Replica replica : e.getValue())
+ replicaToRanges.computeIfAbsent(replica, i -> new
ArrayList<>()).add(e.getKey());
+ }
}
public List<Replica> replicasFor(long token)
@@ -118,7 +124,7 @@ public class TokenPlacementModel
return placementsForRange.get(ranges[idx]);
}
- public NavigableMap<Range, List<Replica>> asMap()
+ public NavigableMap<Range, List<Replica>> asMap()
{
return placementsForRange;
}
@@ -143,6 +149,11 @@ public class TokenPlacementModel
}
return -(low + 1); // key not found
}
+
+ public List<Range> ranges(Replica replica)
+ {
+ return replicaToRanges.get(replica);
+ }
}
public interface CompareTo<V>
@@ -150,12 +161,12 @@ public class TokenPlacementModel
int compareTo(V v);
}
- public static void addIfUnique(List<Replica> replicas, Set<Integer> names,
Replica replica)
+ private static void addIfUnique(List<Replica> replicas, Set<String> names,
Replica replica)
{
- if (names.contains(replica.node().idx()))
+ if (names.contains(replica.node().id()))
return;
replicas.add(replica);
- names.add(replica.node().idx());
+ names.add(replica.node().id());
}
/**
@@ -589,7 +600,7 @@ public class TokenPlacementModel
Range skipped = null;
for (Range range : ranges)
{
- Set<Integer> names = new HashSet<>();
+ Set<String> names = new HashSet<>();
List<Replica> replicas = new ArrayList<>();
int idx = primaryReplica(nodes, range);
if (idx >= 0)
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
index 5db0ee795d..119aefafad 100644
---
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
+++
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
@@ -117,9 +117,8 @@ public class InJVMTokenAwareVisitExecutor extends
LoggingVisitor.LoggingVisitorE
throw new IllegalStateException(String.format("Can not execute
statement %s after %d retries", statement, retries));
}
- protected TokenPlacementModel.ReplicatedRanges getRing()
+ public static TokenPlacementModel.ReplicatedRanges getRing(ICoordinator
coordinator, TokenPlacementModel.ReplicationFactor rf)
{
- ICoordinator coordinator = sut.firstAlive().coordinator();
List<TokenPlacementModel.Node> other =
peerStateToNodes(coordinator.execute("select peer, tokens, data_center, rack
from system.peers", ConsistencyLevel.ONE));
List<TokenPlacementModel.Node> self =
peerStateToNodes(coordinator.execute("select broadcast_address, tokens,
data_center, rack from system.local", ConsistencyLevel.ONE));
List<TokenPlacementModel.Node> all = new ArrayList<>();
@@ -129,6 +128,11 @@ public class InJVMTokenAwareVisitExecutor extends
LoggingVisitor.LoggingVisitorE
return rf.replicate(all);
}
+ protected TokenPlacementModel.ReplicatedRanges getRing()
+ {
+ return getRing(sut.firstAlive().coordinator(), rf);
+ }
+
protected Object[][] executeNodeLocal(String statement,
TokenPlacementModel.Node node, Object... bindings)
{
IInstance instance = sut.cluster
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 20a84f49ca..844fa64cd1 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -344,6 +344,12 @@ public class ASTGenerators
return this;
}
+ public MutationGenBuilder withoutTtl()
+ {
+ ttlGen = ignore -> OptionalInt.empty();
+ return this;
+ }
+
public MutationGenBuilder withOperators()
{
allowOperators = true;
@@ -548,9 +554,10 @@ public class ASTGenerators
builder.addReturn(selectGen.generate(rnd));
}
MutationGenBuilder mutationBuilder = new
MutationGenBuilder(metadata)
- .withoutCas()
- .withoutTimestamp()
- .withReferences(new
ArrayList<>(builder.allowedReferences()));
+ .withoutCas()
+ .withoutTimestamp()
+ .withoutTtl()
+ .withReferences(new
ArrayList<>(builder.allowedReferences()));
if (!allowReferences)
mutationBuilder.withReferences(Collections.emptyList());
Gen<Mutation> updateGen = mutationBuilder.build();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]