This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new e501f3a9d2 TopologyMixupTestBase does not fix replication factor for 
Keyspaces after reaching rf=3
e501f3a9d2 is described below

commit e501f3a9d26b60f70db4f28de73d536845631dd1
Author: David Capwell <[email protected]>
AuthorDate: Wed Oct 2 14:54:29 2024 -0700

    TopologyMixupTestBase does not fix replication factor for Keyspaces after 
reaching rf=3
    
    patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19975
---
 modules/accord                                     |   2 +-
 .../cql3/statements/TransactionStatement.java      |   1 -
 .../apache/cassandra/schema/DistributedSchema.java |   6 +
 .../BeginConsensusMigrationForTableAndRange.java   |   2 +-
 ...beFinishConsensusMigrationForTableAndRange.java |   2 +-
 .../fuzz/topology/AccordTopologyMixupTest.java     |   5 +-
 .../fuzz/topology/HarryTopologyMixupTest.java      |   3 +-
 .../fuzz/topology/TopologyMixupTestBase.java       | 192 ++++++++++++++++-----
 .../cassandra/harry/sut/TokenPlacementModel.java   |  21 ++-
 .../sut/injvm/InJVMTokenAwareVisitExecutor.java    |   8 +-
 .../org/apache/cassandra/utils/ASTGenerators.java  |  13 +-
 11 files changed, 195 insertions(+), 60 deletions(-)

diff --git a/modules/accord b/modules/accord
index d914ee6981..25f23ffec4 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit d914ee69816ebfdf88b2120ff1d8e0bc16edecbc
+Subproject commit 25f23ffec439a921387ca249908798b9cc7d4620
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 1a6b3ee782..63bbb245c9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -91,7 +91,6 @@ import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 import static org.apache.cassandra.service.accord.txn.TxnRead.createTxnRead;
 import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_protocol;
-import static org.apache.cassandra.service.accord.txn.TxnResult.Kind.values;
 
 public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement, CQLStatement.ReturningCQLStatement
 {
diff --git a/src/java/org/apache/cassandra/schema/DistributedSchema.java 
b/src/java/org/apache/cassandra/schema/DistributedSchema.java
index c0f3a914ba..d2c3ff167a 100644
--- a/src/java/org/apache/cassandra/schema/DistributedSchema.java
+++ b/src/java/org/apache/cassandra/schema/DistributedSchema.java
@@ -131,6 +131,12 @@ public class DistributedSchema implements 
MetadataValue<DistributedSchema>
         return tables.get(id);
     }
 
+    public TableMetadata getTableMetadata(String keyspace, String cf)
+    {
+        var ks = keyspaces.getNullable(keyspace);
+        return ks == null ? null : ks.tables.getNullable(cf);
+    }
+
     public static DistributedSchema fromSystemTables(Keyspaces keyspaces, 
Set<String> knownDatacenters)
     {
         if 
(!keyspaces.containsKeyspace(SchemaConstants.METADATA_KEYSPACE_NAME))
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
 
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
index e74a18c81f..8bbdbc8e64 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/BeginConsensusMigrationForTableAndRange.java
@@ -83,7 +83,7 @@ public class BeginConsensusMigrationForTableAndRange 
implements Transformation
     public Result execute(ClusterMetadata prev)
     {
         Transformer transformer = prev.transformer();
-        Collection<TableMetadata> metadata = 
tables.stream().map(Schema.instance::getTableMetadata).collect(Collectors.toList());
+        Collection<TableMetadata> metadata = 
tables.stream().map(prev.schema::getTableMetadata).collect(Collectors.toList());
         ConsensusMigrationState consensusMigrationState = 
prev.consensusMigrationState.withRangesMigrating(metadata, ranges, false);
         return 
Transformation.success(transformer.with(consensusMigrationState), 
LockedRanges.AffectedRanges.EMPTY);
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
index 15bfdda65c..3f857cce09 100644
--- 
a/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
+++ 
b/src/java/org/apache/cassandra/tcm/transformations/MaybeFinishConsensusMigrationForTableAndRange.java
@@ -134,7 +134,7 @@ public class MaybeFinishConsensusMigrationForTableAndRange 
implements Transforma
         logger.info("Completed repair eligibiliy '{}' paxos repaired ranges 
{}, accord repaired ranges {}", repairType, paxosRepairedRanges, 
accordBarrieredRanges);
         checkNotNull(metadata, "clusterMetadata should not be null");
         String ksAndCF = keyspace + "." + cf;
-        TableMetadata tbm = Schema.instance.getTableMetadata(keyspace, cf);
+        TableMetadata tbm = metadata.schema.getTableMetadata(keyspace, cf);
         if (tbm == null)
             return new Rejected(INVALID, format("Table %s is not currently 
performing consensus migration", ksAndCF));
 
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index 991dbfd346..a8e6d67735 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.Nullable;
@@ -121,10 +120,10 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
         return new Spec(mode, enableMigration, metadata);
     }
 
-    private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>, 
Void, ?>> cqlOperations(Spec spec)
+    private static CommandGen<Spec> cqlOperations(Spec spec)
     {
         Gen<Statement> select = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.SelectGenBuilder(spec.metadata).withLimit1().build());
-        Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().build());
+        Gen<Statement> mutation = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.MutationGenBuilder(spec.metadata).withoutTimestamp().withoutTtl().build());
         Gen<Statement> txn = (Gen<Statement>) (Gen<?>) fromQT(new 
ASTGenerators.TxnGenBuilder(spec.metadata).build());
         Map<Gen<Statement>, Integer> operations = new LinkedHashMap<>();
         operations.put(select, 1);
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
index 9fe1039e99..d8bccececd 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.fuzz.topology;
 
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -80,7 +79,7 @@ public class HarryTopologyMixupTest extends 
TopologyMixupTestBase<HarryTopologyM
         return new Spec(harry);
     }
 
-    private static BiFunction<RandomSource, State<Spec>, Command<State<Spec>, 
Void, ?>> cqlOperations(Spec spec)
+    private static CommandGen<Spec> cqlOperations(Spec spec)
     {
         class HarryCommand extends SimpleCommand<State<Spec>>
         {
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index 834d2a9b81..82d6d41db3 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.fuzz.topology;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.net.InetSocketAddress;
@@ -25,7 +26,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -39,9 +39,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.IntArrayList;
+import org.agrona.collections.IntHashSet;
+import org.apache.cassandra.harry.sut.TokenPlacementModel.Range;
+import org.apache.cassandra.harry.sut.TokenPlacementModel.Replica;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,9 +58,6 @@ import accord.utils.Property;
 import accord.utils.Property.Command;
 import accord.utils.Property.SimpleCommand;
 import accord.utils.RandomSource;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntArrayList;
-import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.YamlConfigurationLoader;
 import org.apache.cassandra.distributed.Cluster;
@@ -68,6 +70,8 @@ import 
org.apache.cassandra.distributed.impl.INodeProvisionStrategy;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJVMTokenAwareVisitExecutor;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.schema.ReplicationParams;
@@ -131,6 +135,22 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                                    state -> 
state.cluster.get(toCoordinate).nodetoolResult("repair", 
state.schemaSpec.keyspaceName(), state.schemaSpec.name(), 
"--force").asserts().success());
     }
 
+    private static <S extends TopologyMixupTestBase.SchemaSpec> 
Command<State<S>, Void, ?> repairCommand(int toCoordinate, String ks, String... 
tables) {
+        return new SimpleCommand<>(state -> "nodetool repair " + ks + 
(tables.length == 0 ? "" : " " + Arrays.asList(tables)) + " from node" + 
toCoordinate + state.commandNamePostfix(),
+                state -> {
+                    if (tables.length == 0) {
+                        
state.cluster.get(toCoordinate).nodetoolResult("repair", ks, 
"--force").asserts().success();
+                        return;
+                    }
+                    List<String> args = new ArrayList<>(3 + tables.length);
+                    args.add("repair");
+                    args.add(ks);
+                    args.addAll(Arrays.asList(tables));
+                    args.add("--force");
+                    
state.cluster.get(toCoordinate).nodetoolResult(args.toArray(String[]::new)).asserts().success();
+                });
+    }
+
     private Command<State<S>, Void, ?> waitForCMSToQuiesce()
     {
         return new SimpleCommand<>(state -> "Waiting for CMS to Quiesce" + 
state.commandNamePostfix(),
@@ -139,7 +159,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
 
     private Command<State<S>, Void, ?> stopInstance(RandomSource rs, State<S> 
state)
     {
-        int toStop = rs.pickInt(state.topologyHistory.up());
+        int toStop = rs.pickInt(state.upAndSafe());
         return stopInstance(toStop, "Normal Stop");
     }
 
@@ -184,7 +204,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
 
     private Command<State<S>, Void, ?> removeNodeDecommission(RandomSource rs, 
State<S> state)
     {
-        int toRemove = rs.pickInt(state.topologyHistory.up());
+        int toRemove = rs.pickInt(state.upAndSafe());
         return new SimpleCommand<>("nodetool decommission node" + toRemove + 
state.commandNamePostfix(), s2 -> {
             IInvokableInstance inst = s2.cluster.get(toRemove);
             TopologyHistory.Node node = s2.topologyHistory.node(toRemove);
@@ -198,7 +218,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
     private Command<State<S>, Void, ?> removeNode(RandomSource rs, State<S> 
state)
     {
         int[] up = state.topologyHistory.up();
-        int toRemove = rs.pickInt(up);
+        int toRemove = rs.pickInt(state.upAndSafe());
         int toCoordinate;
         {
             int picked;
@@ -223,14 +243,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
 
     private Command<State<S>, Void, ?> removeNodeAssassinate(RandomSource rs, 
State<S> state)
     {
-        //TODO (correctness): assassinate CMS member isn't allowed
-        IntHashSet up = asSet(state.topologyHistory.up());
-        IntHashSet cmsGroup = asSet(state.cmsGroup);
-        Sets.SetView<Integer> upAndNotInCMS = Sets.difference(up, cmsGroup);
-        if (upAndNotInCMS.isEmpty()) throw new AssertionError("Every node is a 
CMS member");
-        List<Integer> allowed = new ArrayList<>(upAndNotInCMS);
-        allowed.sort(Comparator.naturalOrder());
-        int toRemove = rs.pick(allowed);
+        int toRemove = rs.pickInt(state.upAndSafe());
         int toCoordinate;
         {
             int[] upInt = state.topologyHistory.up();
@@ -274,7 +287,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
 
     private Command<State<S>, Void, ?> hostReplace(RandomSource rs, State<S> 
state)
     {
-        int nodeToReplace = rs.pickInt(state.topologyHistory.up());
+        int nodeToReplace = rs.pickInt(state.upAndSafe());
         IInvokableInstance toReplace = state.cluster.get(nodeToReplace);
         TopologyHistory.Node adding = 
state.topologyHistory.replace(nodeToReplace);
         TopologyHistory.Node removing = 
state.topologyHistory.nodes.get(nodeToReplace);
@@ -326,6 +339,12 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                                       
TopologyMixupTestBase.this.destroyState(state, cause);
                                   }
                               })
+                              .commandsTransformer((state, gen) -> {
+                                  for (BiFunction<State<S>, 
Gen<Command<State<S>, Void, ?>>, Gen<Command<State<S>, Void, ?>>> fn : 
state.commandsTransformers)
+                                      gen = fn.apply(state, gen);
+                                  return gen;
+                              })
+                              .onSuccess((state, sut, history) -> 
logger.info("Successful for the following:\nState {}\nHistory:\n{}", state, 
Property.formatList("\t\t", history)))
                               .build());
     }
 
@@ -336,12 +355,14 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         // so up is enough to know the topology size
         int up = state.topologyHistory.up().length;
         int down = state.topologyHistory.down().length;
+        int[] upAndSafe = state.upAndSafe();
         int total = up + down;
         if (total < state.topologyHistory.maxNodes)
             possibleTopologyChanges.add(TopologyChange.AddNode);
-        if (up > state.topologyHistory.quorum())
+        if (upAndSafe.length > 0)
         {
-            if (up > TARGET_RF)
+            // can't remove the node if all nodes are CMS nodes
+            if (!Sets.difference(asSet(upAndSafe), 
asSet(state.cmsGroup)).isEmpty())
                 possibleTopologyChanges.add(TopologyChange.RemoveNode);
             possibleTopologyChanges.add(TopologyChange.HostReplace);
             possibleTopologyChanges.add(TopologyChange.StopNode);
@@ -395,21 +416,30 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         String keyspaceName();
     }
 
+    protected interface CommandGen<S extends TopologyMixupTestBase.SchemaSpec>
+    {
+        Command<State<S>, Void, ?> apply(RandomSource rs, State<S> state);
+    }
+
     protected static class State<S extends SchemaSpec> implements AutoCloseable
     {
         final TopologyHistory topologyHistory;
         final Cluster cluster;
         final S schemaSpec;
+        final List<BiFunction<State<S>, Gen<Command<State<S>, Void, ?>>, 
Gen<Command<State<S>, Void, ?>>>> commandsTransformers = new ArrayList<>();
         final List<Runnable> preActions = new CopyOnWriteArrayList<>();
         final AtomicLong currentEpoch = new AtomicLong();
-        final BiFunction<RandomSource, State<S>, Command<State<S>, Void, ?>> 
statementGen;
+        final CommandGen<S> statementGen;
         final Gen<RemoveType> removeTypeGen;
         private final Map<String, Object> yamlConfigOverrides;
         int[] cmsGroup = new int[0];
+        private TokenPlacementModel.ReplicationFactor rf;
+        TokenPlacementModel.ReplicatedRanges ring = null;
 
-        public State(RandomSource rs, BiFunction<RandomSource, Cluster, S> 
schemaSpecGen, Function<S, BiFunction<RandomSource, State<S>, Command<State<S>, 
Void, ?>>> cqlOperationsGen)
+        public State(RandomSource rs, BiFunction<RandomSource, Cluster, S> 
schemaSpecGen, Function<S, CommandGen<S>> cqlOperationsGen)
         {
             this.topologyHistory = new TopologyHistory(rs.fork(), 2, 4);
+            rf = new TokenPlacementModel.SimpleReplicationFactor(2);
             try
             {
 
@@ -464,19 +494,46 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                 result.asserts().success();
                 logger.info("CMS reconfigure: {}", result.getStdout());
             }
-            preActions.add(new Runnable()
-            {
+            commandsTransformers.add(new BiFunction<State<S>, 
Gen<Command<State<S>, Void, ?>>, Gen<Command<State<S>, Void, ?>>>() {
                 // in order to remove this action, an anonymous class is 
needed so "this" works, lambda "this" is the parent class
                 @Override
-                public void run()
-                {
-                    if (topologyHistory.up().length == TARGET_RF)
-                    {
+                public Gen<Command<State<S>, Void, ?>> apply(State<S> state, 
Gen<Command<State<S>, Void, ?>> commandGen) {
+                    if (topologyHistory.up().length < TARGET_RF)
+                        return commandGen;
+                    SimpleCommand<State<S>> reconfig = new 
SimpleCommand<>("nodetool cms reconfigure " + TARGET_RF, ignore -> {
                         NodeToolResult result = 
cluster.get(1).nodetoolResult("cms", "reconfigure", 
Integer.toString(TARGET_RF));
                         result.asserts().success();
                         logger.info("CMS reconfigure: {}", result.getStdout());
-                        preActions.remove(this);
-                    }
+                    });
+                    SimpleCommand<State<S>> fixDistributedSchemas = new 
SimpleCommand<>("Set system distributed keyspaces to RF=" + TARGET_RF, ignore ->
+                            fixDistributedSchemas(cluster));
+                    SimpleCommand<State<S>> fixTestKeyspace = new 
SimpleCommand<>("Set " + KEYSPACE + " keyspace to RF=" + TARGET_RF, s -> {
+                        cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 
TARGET_RF + "}");
+                        rf = new 
TokenPlacementModel.SimpleReplicationFactor(TARGET_RF);
+                    });
+                    var self = this;
+                    return rs -> {
+                        Command<State<S>, Void, ?> next = commandGen.next(rs);
+                        if (next.checkPreconditions(state) == 
Property.PreCheckResult.Ignore)
+                            return next;
+                        commandsTransformers.remove(self);
+                        int[] up = state.topologyHistory.up();
+                        List<Command<State<S>, Void, ?>> commands = new 
ArrayList<>();
+                        commands.add(fixDistributedSchemas);
+                        for (String ks : Arrays.asList("system_auth", 
"system_traces"))
+                        {
+                            int coordinator = rs.pickInt(up);
+                            commands.add(repairCommand(coordinator, ks));
+                        }
+                        commands.add(fixTestKeyspace);
+                        {
+                            int coordinator = rs.pickInt(up);
+                            commands.add(repairCommand(coordinator, KEYSPACE));
+                        }
+                        commands.add(reconfig);
+                        commands.add(next);
+                        return multistep(commands);
+                    };
                 }
             });
             preActions.add(() -> {
@@ -485,6 +542,8 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                 IInvokableInstance node = cluster.get(up[up.length - 1]);
                 cmsGroup = HackSerialization.cmsGroup(node);
                 currentEpoch.set(HackSerialization.tcmEpoch(node));
+
+                ring = 
InJVMTokenAwareVisitExecutor.getRing(cluster.coordinator(up[0]), rf);
             });
             preActions.add(() -> cluster.checkAndResetUncaughtExceptions());
             this.schemaSpec = schemaSpecGen.apply(rs, cluster);
@@ -509,7 +568,49 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
 
         protected String commandNamePostfix()
         {
-            return "; epoch=" + currentEpoch.get() + ", cms=" + 
Arrays.toString(cmsGroup);
+            return "; epoch=" + currentEpoch.get() + ", cms=" + 
Arrays.toString(cmsGroup) + ", up=" + Arrays.toString(topologyHistory.up()) + 
", down=" + Arrays.toString(topologyHistory.down());
+        }
+
+        public int[] upAndSafe()
+        {
+            IntHashSet up = asSet(topologyHistory.up());
+            int quorum = topologyHistory.quorum();
+            // find what ranges are able to handle 1 node loss
+            Set<Range> safeRanges = new HashSet<>();
+            Int2ObjectHashMap<Replica> idToReplica = new Int2ObjectHashMap<>();
+            for (Map.Entry<Range, List<Replica>> e : ring.asMap().entrySet())
+            {
+                IntHashSet alive = new IntHashSet();
+                for (var replica : e.getValue())
+                {
+                    //TODO (fix test api): NodeId is in the API but is always 
null.  Cheapest way to get the id is to assume the address has it
+                    // same issue with address...
+                    // /127.0.0.2
+                    String harryId = replica.node().id();
+                    int index = harryId.lastIndexOf('.');
+                    int peer = Integer.parseInt(harryId.substring(index + 1));
+                    idToReplica.put(peer, replica);
+                    if (up.contains(peer))
+                        alive.add(peer);
+                }
+                if (quorum < alive.size())
+                    safeRanges.add(e.getKey());
+            }
+
+            // filter nodes where 100% of their ranges are "safe"
+            IntArrayList safeNodes = new IntArrayList();
+            for (int id : up)
+            {
+                Replica replica = idToReplica.get(id);
+                List<Range> ranges = ring.ranges(replica);
+
+                if (ranges.stream().allMatch(safeRanges::contains))
+                    safeNodes.add(id);
+            }
+
+            int[] upAndSafe = safeNodes.toIntArray();
+            Arrays.sort(upAndSafe);
+            return upAndSafe;
         }
 
         @Override
@@ -529,17 +630,26 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         @Override
         public void close() throws Exception
         {
-            epochHistory = cluster.get(cmsGroup[0]).callOnInstance(() -> {
-                LogState all = ClusterMetadataService.instance()
-                                                     .processor()
-                                                     .getLogState(Epoch.EMPTY, 
Epoch.create(Long.MAX_VALUE), false,
-                                                                  
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
-                                                                               
                    TCMMetrics.instance.commitRetries));
-                StringBuilder sb = new StringBuilder("Epochs:");
-                for (Entry e : all.entries)
-                    sb.append("\n").append(e.epoch.getEpoch()).append(": 
").append(e.transform);
-                return sb.toString();
-            });
+            var cmsNodesUp = Sets.intersection(asSet(cmsGroup), 
asSet(topologyHistory.up()));
+            int cmsNode = Iterables.getFirst(cmsNodesUp, null);
+            try
+            {
+                epochHistory = cluster.get(cmsNode).callOnInstance(() -> {
+                    LogState all = ClusterMetadataService.instance()
+                                                         .processor()
+                                                         
.getLogState(Epoch.EMPTY, Epoch.create(Long.MAX_VALUE), false,
+                                                                      
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
+                                                                               
                        TCMMetrics.instance.commitRetries));
+                    StringBuilder sb = new StringBuilder("Epochs:");
+                    for (Entry e : all.entries)
+                        
sb.append("\n\t\t").append(e.epoch.getEpoch()).append(": ").append(e.transform);
+                    return sb.toString();
+                });
+            }
+            catch (Throwable t)
+            {
+                logger.warn("Unable to fetch epoch history on node{}", 
cmsNode, t);
+            }
             cluster.close();
         }
     }
diff --git 
a/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java 
b/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
index a25d1ffc6e..8c4c953283 100644
--- a/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java
@@ -95,11 +95,17 @@ public class TokenPlacementModel
     {
         public final Range[] ranges;
         public final NavigableMap<Range, List<Replica>> placementsForRange;
+        private final Map<Replica, List<Range>> replicaToRanges = new 
HashMap<>();
 
         public ReplicatedRanges(Range[] ranges, NavigableMap<Range, 
List<Replica>> placementsForRange)
         {
             this.ranges = ranges;
             this.placementsForRange = placementsForRange;
+            for (Map.Entry<Range, List<Replica>> e : 
placementsForRange.entrySet())
+            {
+                for (Replica replica : e.getValue())
+                    replicaToRanges.computeIfAbsent(replica, i -> new 
ArrayList<>()).add(e.getKey());
+            }
         }
 
         public List<Replica> replicasFor(long token)
@@ -118,7 +124,7 @@ public class TokenPlacementModel
             return placementsForRange.get(ranges[idx]);
         }
 
-        public NavigableMap<Range, List<Replica>> asMap()
+        public NavigableMap<Range, List<Replica>>  asMap()
         {
             return placementsForRange;
         }
@@ -143,6 +149,11 @@ public class TokenPlacementModel
             }
             return -(low + 1); // key not found
         }
+
+        public List<Range> ranges(Replica replica)
+        {
+            return replicaToRanges.get(replica);
+        }
     }
 
     public interface CompareTo<V>
@@ -150,12 +161,12 @@ public class TokenPlacementModel
         int compareTo(V v);
     }
 
-    public static void addIfUnique(List<Replica> replicas, Set<Integer> names, 
Replica replica)
+    private static void addIfUnique(List<Replica> replicas, Set<String> names, 
Replica replica)
     {
-        if (names.contains(replica.node().idx()))
+        if (names.contains(replica.node().id()))
             return;
         replicas.add(replica);
-        names.add(replica.node().idx());
+        names.add(replica.node().id());
     }
 
     /**
@@ -589,7 +600,7 @@ public class TokenPlacementModel
             Range skipped = null;
             for (Range range : ranges)
             {
-                Set<Integer> names = new HashSet<>();
+                Set<String> names = new HashSet<>();
                 List<Replica> replicas = new ArrayList<>();
                 int idx = primaryReplica(nodes, range);
                 if (idx >= 0)
diff --git 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
index 5db0ee795d..119aefafad 100644
--- 
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
+++ 
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJVMTokenAwareVisitExecutor.java
@@ -117,9 +117,8 @@ public class InJVMTokenAwareVisitExecutor extends 
LoggingVisitor.LoggingVisitorE
         throw new IllegalStateException(String.format("Can not execute 
statement %s after %d retries", statement, retries));
     }
 
-    protected TokenPlacementModel.ReplicatedRanges getRing()
+    public static TokenPlacementModel.ReplicatedRanges getRing(ICoordinator 
coordinator, TokenPlacementModel.ReplicationFactor rf)
     {
-        ICoordinator coordinator = sut.firstAlive().coordinator();
         List<TokenPlacementModel.Node> other = 
peerStateToNodes(coordinator.execute("select peer, tokens, data_center, rack 
from system.peers", ConsistencyLevel.ONE));
         List<TokenPlacementModel.Node> self = 
peerStateToNodes(coordinator.execute("select broadcast_address, tokens, 
data_center, rack from system.local", ConsistencyLevel.ONE));
         List<TokenPlacementModel.Node> all = new ArrayList<>();
@@ -129,6 +128,11 @@ public class InJVMTokenAwareVisitExecutor extends 
LoggingVisitor.LoggingVisitorE
         return rf.replicate(all);
     }
 
+    protected TokenPlacementModel.ReplicatedRanges getRing()
+    {
+        return getRing(sut.firstAlive().coordinator(), rf);
+    }
+
     protected Object[][] executeNodeLocal(String statement, 
TokenPlacementModel.Node node, Object... bindings)
     {
         IInstance instance = sut.cluster
diff --git a/test/unit/org/apache/cassandra/utils/ASTGenerators.java 
b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
index 20a84f49ca..844fa64cd1 100644
--- a/test/unit/org/apache/cassandra/utils/ASTGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/ASTGenerators.java
@@ -344,6 +344,12 @@ public class ASTGenerators
             return this;
         }
 
+        public MutationGenBuilder withoutTtl()
+        {
+            ttlGen = ignore -> OptionalInt.empty();
+            return this;
+        }
+
         public MutationGenBuilder withOperators()
         {
             allowOperators = true;
@@ -548,9 +554,10 @@ public class ASTGenerators
                         builder.addReturn(selectGen.generate(rnd));
                     }
                     MutationGenBuilder mutationBuilder = new 
MutationGenBuilder(metadata)
-                                                          .withoutCas()
-                                                          .withoutTimestamp()
-                                                          .withReferences(new 
ArrayList<>(builder.allowedReferences()));
+                                                         .withoutCas()
+                                                         .withoutTimestamp()
+                                                         .withoutTtl()
+                                                         .withReferences(new 
ArrayList<>(builder.allowedReferences()));
                     if (!allowReferences)
                         
mutationBuilder.withReferences(Collections.emptyList());
                     Gen<Mutation> updateGen = mutationBuilder.build();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to