This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 090f7c3fb4eb82661f0c41a8d16e44366e1ea4e0 Author: Alex Petrov <[email protected]> AuthorDate: Fri Mar 31 10:12:20 2023 +0200 [CEP-21] Implement multi-dc placement simulator for NTS patch by Alex Petrov; reviewed by Marcus Eriksson and Sam Tunnicliffe for CASSANDRA-18417 --- .../org/apache/cassandra/service/paxos/Paxos.java | 2 +- .../distributed/test/log/CMSTestBase.java | 8 +- .../distributed/test/log/CoordinatorPathTest.java | 6 +- .../test/log/CoordinatorPathTestBase.java | 68 ++-- .../test/log/MetadataChangeSimulationTest.java | 162 ++++------ .../distributed/test/log/PlacementSimulator.java | 346 ++++++++++++++++++--- .../test/log/PlacementSimulatorTest.java | 55 ++-- .../UniformRangePlacementIntegrationTest.java | 103 ++++++ 8 files changed, 546 insertions(+), 204 deletions(-) diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index f105918356..781b365074 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -105,9 +105,9 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.cassandra.config.Config.PaxosVariant.v2_without_linearizable_reads_or_rejected_writes; import static org.apache.cassandra.db.Keyspace.openAndGetStore; import static org.apache.cassandra.exceptions.RequestFailureReason.TIMEOUT; -import static org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION; import static org.apache.cassandra.config.DatabaseDescriptor.*; import static org.apache.cassandra.db.ConsistencyLevel.*; +import static org.apache.cassandra.gms.ApplicationState.RELEASE_VERSION; import static org.apache.cassandra.locator.InetAddressAndPort.Serializer.inetAddressAndPortSerializer; import static org.apache.cassandra.locator.ReplicaLayout.forTokenWriteLiveAndDown; import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java index 912d0a7aa5..7342c7be39 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CMSTestBase.java @@ -56,7 +56,7 @@ public class CMSTestBase DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch() { public String getRack(InetAddressAndPort endpoint) {return "rack1";} - public String getDatacenter(InetAddressAndPort endpoint) {return "dc1";} + public String getDatacenter(InetAddressAndPort endpoint) {return "datacenter1";} public <C extends ReplicaCollection<? extends C>> C sortedByProximity(InetAddressAndPort address, C addresses) {return null;} public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2) {return 0;} public void gossiperStarting() {} @@ -103,9 +103,9 @@ public class CMSTestBase service.commit(new Initialize(ClusterMetadata.current())); service.commit(new AlterSchema((cm, schema) -> { return schema.with(Keyspaces.of(KeyspaceMetadata.create("test", KeyspaceParams.simple(rf)), - KeyspaceMetadata.create("test_nts", KeyspaceParams.nts("datacenter1", 3, - "datacenter2", 3, - "datacenter3", 3)))); + KeyspaceMetadata.create("test_nts", KeyspaceParams.nts("datacenter1", rf, + "datacenter2", rf, + "datacenter3", rf)))); }, schemaProvider)); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTest.java index 71c9eeefdc..962faf2268 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTest.java @@ -78,7 +78,7 @@ public class CoordinatorPathTest extends CoordinatorPathTestBase // At most 2 replicas should respond, so that when the pending node is added, results would be insufficient for recomputed blockFor BooleanSupplier shouldRespond = atMostResponses(simulatedCluster.state.get().isWriteTargetFor(token(pk), simulatedCluster.node(1).matcher) ? 1 : 2); List<WaitingAction<?,?>> waiting = simulatedCluster - .filter((n) -> replicas.stream().anyMatch(n.matcher) && n.id != 1) + .filter((n) -> replicas.stream().anyMatch(n.matcher) && n.node.idx() != 1) .map((nodeToBlockOn) -> nodeToBlockOn.blockOnReplica((node) -> new MutationAction(node, shouldRespond))) .collect(Collectors.toList()); @@ -139,8 +139,8 @@ public class CoordinatorPathTest extends CoordinatorPathTestBase List<Node> replicas = simulatedCluster.state.get().readReplicasFor(token(pk)); Function<Integer, BooleanSupplier> shouldRespond = respondFrom(1, 4); List<WaitingAction<?,?>> waiting = simulatedCluster - .filter((n) -> replicas.stream().anyMatch(n.matcher) && n.id != 1) - .map((nodeToBlockOn) -> nodeToBlockOn.blockOnReplica((node) -> new ReadAction(node, shouldRespond.apply(nodeToBlockOn.id)))) + .filter((n) -> replicas.stream().anyMatch(n.matcher) && n.node.idx() != 1) + .map((nodeToBlockOn) -> nodeToBlockOn.blockOnReplica((node) -> new ReadAction(node, shouldRespond.apply(nodeToBlockOn.node.idx())))) .collect(Collectors.toList()); Future<?> readQuery = async(() -> cluster.coordinator(1).execute("select * from distributed_test_keyspace.tbl where pk = ?", ConsistencyLevel.QUORUM, pk)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 2b6f647679..db295413b3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -220,9 +220,9 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase protected final EnumMap<Verb, SimulatedAction<?, ?>> actions; protected final SimulatedCluster cluster; - private RealSimulatedNode(SimulatedCluster simulatedCluster, int idx, String name, long token) + private RealSimulatedNode(SimulatedCluster simulatedCluster, Node node) { - super(simulatedCluster.state, idx, name, token); + super(simulatedCluster.state, node); this.entryIdGen = new Entry.DefaultEntryIdGen(); this.actions = new EnumMap<>(Verb.class); @@ -246,9 +246,8 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase public String toString() { return "RealSimulatedNode{" + - "id=" + id + - ", name='" + name + '\'' + - ", token=" + token + + "id=" + node.idx() + + ", token=" + node.token() + '}'; } @@ -256,34 +255,34 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase public void register() { super.register(); - ClusterMetadataTestHelper.register(id); + ClusterMetadataTestHelper.register(node.idx()); } @Override public void join() { super.join(); - ClusterMetadataTestHelper.join(id, token); + ClusterMetadataTestHelper.join(node.idx(), node.token()); } @Override public void leave() { super.leave(); - ClusterMetadataTestHelper.leave(id); + ClusterMetadataTestHelper.leave(node.idx()); } - public void replace() + public void replace(int replaced) { super.replace(); - ClusterMetadataTestHelper.replace(id, this.id); + ClusterMetadataTestHelper.replace(replaced, node.idx()); } @Override public ClusterMetadataTestHelper.JoinProcess lazyJoin() { ClusterMetadataTestHelper.JoinProcess virtual = super.lazyJoin(); - ClusterMetadataTestHelper.JoinProcess real = ClusterMetadataTestHelper.lazyJoin(id, token); + ClusterMetadataTestHelper.JoinProcess real = ClusterMetadataTestHelper.lazyJoin(node.idx(), node.token()); return new ClusterMetadataTestHelper.JoinProcess() { public ClusterMetadataTestHelper.JoinProcess prepareJoin() @@ -320,7 +319,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase public ClusterMetadataTestHelper.LeaveProcess lazyLeave() { ClusterMetadataTestHelper.LeaveProcess virtual = super.lazyLeave(); - ClusterMetadataTestHelper.LeaveProcess real = ClusterMetadataTestHelper.lazyLeave(id); + ClusterMetadataTestHelper.LeaveProcess real = ClusterMetadataTestHelper.lazyLeave(node.idx()); return new ClusterMetadataTestHelper.LeaveProcess() { public ClusterMetadataTestHelper.LeaveProcess prepareLeave() @@ -383,7 +382,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase } }, () -> { - sendFrom(id, payload); + sendFrom(node.idx(), payload); try { return resFuture.get(10, TimeUnit.SECONDS); @@ -433,6 +432,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase try { cluster.realCluster.deliverMessage(cluster.realCluster.get(1).broadcastAddress(), + // todo: use addr from the node! Instance.serializeMessage(ClusterMetadataTestHelper.addr(from), ClusterMetadataTestHelper.addr(1), message)); @@ -488,19 +488,15 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase */ protected static class VirtualSimulatedNode implements ClusterMetadataTestHelper.NodeOperations { - public final int id; - public final String name; - public final long token; + public final Node node; public final Predicate<Node> matcher; protected final SimulatedPlacementHolder ref; - public VirtualSimulatedNode(SimulatedPlacementHolder ref, int id, String name, long token) + public VirtualSimulatedNode(SimulatedPlacementHolder ref, Node node) { this.ref = ref; - this.id = id; - this.name = name; - this.token = token; - this.matcher = (node) -> node.id.equals(name); + this.node = node; + this.matcher = (o) -> node.idx() == o.idx(); } @Override @@ -547,7 +543,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase assert idx == -1; assert steps == null; - ModelChecker.Pair<SimulatedPlacements, PlacementSimulator.Transformations> res = bootstrap_diffBased(ref.get(), name, token); + ModelChecker.Pair<SimulatedPlacements, PlacementSimulator.Transformations> res = bootstrap_diffBased(ref.get(), node.idx(), node.token()); ref.set(res.l); steps = res.r; ref.apply(steps); @@ -594,7 +590,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase assert idx == -1; assert steps == null; - ModelChecker.Pair<SimulatedPlacements, Transformations> res = leave_diffBased(ref.get(), token); + ModelChecker.Pair<SimulatedPlacements, Transformations> res = leave_diffBased(ref.get(), node.token()); ref.set(res.l); steps = res.r; idx++; @@ -638,7 +634,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase assert idx == -1; assert steps == null; - ModelChecker.Pair<SimulatedPlacements, PlacementSimulator.Transformations> res = replace_directly(ref.get(), token, name); + ModelChecker.Pair<SimulatedPlacements, PlacementSimulator.Transformations> res = replace_directly(ref.get(), node.token(), node.idx()); ref.set(res.l); steps = res.r; ref.apply(steps); @@ -689,7 +685,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase this.tokenSupplier = tokenSupplier; InetAddressAndPort nodeUnderTestAddr = ClusterMetadataTestHelper.addr(1); - Node nodeUnderTest = new Node(tokenSupplier.token(1), nodeUnderTestAddr.toString()); + Node nodeUnderTest = new Node(tokenSupplier.token(1), nodeUnderTestAddr.addressBytes[3]); List<Node> orig = Collections.singletonList(nodeUnderTest); this.state = new RefSimulatedPlacementHolder(new SimulatedPlacements(3, Collections.singletonList(nodeUnderTest), @@ -700,7 +696,8 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase this.nodes = new HashMap<>(); // We would like all messages directed to the node under test to be delivered it. - this.nodes.put(nodeUnderTestAddr, new RealSimulatedNode(this, 1, nodeUnderTestAddr.toString(), tokenSupplier.token(1)) { + this.nodes.put(nodeUnderTestAddr, new RealSimulatedNode(this, new Node(tokenSupplier.token(1), 1)) { + @Override public boolean test(Message<?> message) { realCluster.get(1).receiveMessage(Instance.serializeMessage(message.from(), nodeUnderTestAddr, message)); @@ -865,9 +862,9 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase public RealSimulatedNode createNode() { int idx = this.nodes.size() + 1; - RealSimulatedNode node = new RealSimulatedNode(this, idx, ClusterMetadataTestHelper.addr(idx).toString(), tokenSupplier.token(idx)); + RealSimulatedNode node = new RealSimulatedNode(this, new Node(tokenSupplier.token(idx), idx)); node.initializeDefaultHandlers(); - nodes.put(ClusterMetadataTestHelper.addr(node.id), node); + nodes.put(node.node.addr(), node); return node; } @@ -924,16 +921,13 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase this.tokenSupplier = tokenSupplier; this.nodes = new ArrayList<>(); for (Node node : state.get().nodes) - { - int idx = nodes.size() + 1; - this.nodes.add(new VirtualSimulatedNode(state, idx++, node.id, node.token)); - } + this.nodes.add(new VirtualSimulatedNode(state, node)); } public VirtualSimulatedNode createNode() { int idx = nodes.size() + 1; - VirtualSimulatedNode node = new VirtualSimulatedNode(state, idx, ClusterMetadataTestHelper.addr(idx).toString(), tokenSupplier.token(idx)); + VirtualSimulatedNode node = new VirtualSimulatedNode(state, new Node(tokenSupplier.token(idx), idx)); nodes.add(node); return node; } @@ -944,7 +938,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase } } - public static interface SimulatedAction<IN, OUT> + public interface SimulatedAction<IN, OUT> { Verb verb(); void validate(Message<IN> request); @@ -1080,7 +1074,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase { ReadCommand command = request.payload; return Message.remoteResponseForTests(request.id(), - ClusterMetadataTestHelper.addr(node.id), + node.node.addr(), request.verb().responseVerb, ReadResponse.createDataResponse(EmptyIterators.unfilteredPartition(command.metadata()), command)); @@ -1149,13 +1143,13 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase Mutation command = request.payload; Murmur3Partitioner.LongToken requestToken = (Murmur3Partitioner.LongToken) command.key().getToken(); assert node.cluster.state.get().isWriteTargetFor(requestToken.token, node.matcher) : String.format("Node %s is not a write target for %s. Write placements: %s", - node.id, requestToken, node.cluster.state.get().writePlacementsFor(requestToken.token)); + node.node.idx(), requestToken, node.cluster.state.get().writePlacementsFor(requestToken.token)); } public Message<NoPayload> respondTo(Message<Mutation> request) { if (shouldRespond.getAsBoolean()) - return Message.remoteResponseForTests(request.id(), ClusterMetadataTestHelper.addr(node.id), request.verb().responseVerb, NoPayload.noPayload); + return Message.remoteResponseForTests(request.id(), node.node.addr(), request.verb().responseVerb, NoPayload.noPayload); return null; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java index 511a8e67b7..4d0605b0d6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.distributed.test.log; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -31,6 +30,7 @@ import org.junit.Test; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.distributed.test.log.PlacementSimulator.SimulatedPlacements; import org.apache.cassandra.distributed.test.log.PlacementSimulator.Transformations; import org.apache.cassandra.locator.EndpointsForRange; @@ -40,7 +40,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.Location; -import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; @@ -61,6 +61,7 @@ import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.tcm.transformations.UnsafeJoin; import org.apache.cassandra.schema.ReplicationParams; +import static org.apache.cassandra.distributed.test.log.PlacementSimulator.*; import static org.apache.cassandra.distributed.test.log.PlacementSimulator.bootstrap_diffBased; import static org.apache.cassandra.distributed.test.log.PlacementSimulator.leave_diffBased; import static org.apache.cassandra.distributed.test.log.PlacementSimulator.move_diffBased; @@ -80,7 +81,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase { for (int rf : new int[]{ 2, 3, 5 }) { - simulate(50, rf, concurrency, "Diff"); + simulate(50, rf, concurrency); } } } @@ -97,9 +98,9 @@ public class MetadataChangeSimulationTest extends CMSTestBase testMoveReal(3, 5, 350, 10); } - public static PlacementSimulator.Node n(int id, long token) + public static Node n(int idx, long token) { - return new PlacementSimulator.Node(token, "127.0.0." + id); + return new Node(token, idx); } public void testMoveReal(int rf, int moveNodeId, long moveToken, int numberOfNodes) throws Throwable @@ -111,16 +112,16 @@ public class MetadataChangeSimulationTest extends CMSTestBase for (int i = 0; i < numberOfNodes; i++) { LongToken token = new Murmur3Partitioner.LongToken((i + 1) * 100L); - ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, MetadataChangeSimulationTest.Node> registration = registerNewNode(modelState, sut, () -> token); + ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, Node> registration = registerNewNode(modelState, sut, () -> token); if (moveNodeId == i) movingNode = registration.r; modelState = registration.l; modelState = joinWithoutBootstrap(registration.r, modelState, sut).l; } - Node movingTo = new Node(movingNode.nodeId, movingNode.addr, new Murmur3Partitioner.LongToken(moveToken)); - Move move = prepareMove(sut, movingNode, movingTo.token).get(); - modelState = scheduleMoveEvents(modelState, sut, movingNode, movingTo.token, move).l; + Node movingTo = movingNode.withNewToken(moveToken); + Move move = prepareMove(sut, movingNode, movingTo.longToken()).get(); + modelState = scheduleMoveEvents(modelState, sut, movingNode, movingTo.longToken(), move).l; while (modelState.operationStates.get(0).remaining.hasNext()) { @@ -162,7 +163,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase ClusterMetadata actualMetadata = sut.service.metadata(); ReplicationParams replication = actualMetadata.schema.getKeyspaces().get("test").get().params.replication; - Assert.assertEquals(modelState.simulatedPlacements.nodes.stream().map(t -> t.token).collect(Collectors.toSet()), + Assert.assertEquals(modelState.simulatedPlacements.nodes.stream().map(Node::token).collect(Collectors.toSet()), actualMetadata.tokenMap.tokens().stream().map(t -> ((LongToken) t).token).collect(Collectors.toSet())); for (Map.Entry<ReplicationParams, DataPlacement> e : actualMetadata.placements.asMap().entrySet()) @@ -195,7 +196,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase for (int i = 0; i < 10; i++) { LongToken token = new Murmur3Partitioner.LongToken((i + 1) * 100); - ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, MetadataChangeSimulationTest.Node> registration = registerNewNode(state, sut, () -> token); + ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, Node> registration = registerNewNode(state, sut, () -> token); if (decomNodeId == i) decomNode = registration.r; state = registration.l; @@ -247,7 +248,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase for (int i = 0; i < 10; i++) { LongToken token = new Murmur3Partitioner.LongToken((i + 1) * 100); - ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, MetadataChangeSimulationTest.Node> registration = registerNewNode(modelState, sut, () -> token); + ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, Node> registration = registerNewNode(modelState, sut, () -> token); modelState = registration.l; if (decomNodeId == i) joiningNode = registration.r; @@ -294,7 +295,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase for (int i = 0; i < 10; i++) { LongToken token = new Murmur3Partitioner.LongToken(rng.nextLong()); - ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, MetadataChangeSimulationTest.Node> registration = registerNewNode(modelState, sut, () -> token, "datacenter" + (((i + 1) % 3) + 1), "rack1"); + ModelChecker.Pair<MetadataChangeSimulationTest.ModelState, Node> registration = registerNewNode(modelState, sut, () -> token, (((i + 1) % 3) + 1), 1); modelState = registration.l; modelState = joinWithoutBootstrap(registration.r, modelState, sut).l; } @@ -331,10 +332,10 @@ public class MetadataChangeSimulationTest extends CMSTestBase // TODO: use several keyspaces in the test, preferrably with different replication factors, since this will meddle with // range locks in a non-trivial way and will probably break things // TODO: add keyspace _creation_ to the mix, since this should also preserve placements - public void simulate(int toBootstrap, int rf, int concurrency, String desc) throws Throwable + public void simulate(int toBootstrap, int rf, int concurrency) throws Throwable { - System.out.println(String.format("RUNNING SIMULATION. TO BOOTSTRAP: %s, RF: %s, CONCURRENCY: %s, BOOTSTRAP MODE: %s", - toBootstrap, rf, concurrency, desc)); + System.out.println(String.format("RUNNING SIMULATION. TO BOOTSTRAP: %s, RF: %s, CONCURRENCY: %s", + toBootstrap, rf, concurrency)); long startTime = System.currentTimeMillis(); final List<Long> longs; final Iterator<Long> tokens; @@ -393,7 +394,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase .step((state, sut) -> state.shouldReplace(rf, rng), (state, sut, entropySource) -> { Node toReplace = getRemovalCandidate(state, entropySource); - ModelChecker.Pair<ModelState, Node> registration = registerNewNode(state, sut, () -> new LongToken(toReplace.token.token)); + ModelChecker.Pair<ModelState, Node> registration = registerNewNode(state, sut, toReplace::longToken); state = registration.l; Node replacement = registration.r; Optional<BootstrapAndReplace> plan = prepareReplace(sut, toReplace, replacement); @@ -421,9 +422,9 @@ public class MetadataChangeSimulationTest extends CMSTestBase ? oldOperationState.nodes[1] : oldOperationState.nodes[0]; ClusterMetadata metadata = sut.service.metadata(); - InProgressSequence<?> operation = metadata.inProgressSequences.get(node.nodeId); - assert operation != null : "No in-progress sequence found for node " + node.nodeId; - sut.service.commit(new CancelInProgressSequence(node.nodeId)); + InProgressSequence<?> operation = metadata.inProgressSequences.get(node.nodeId()); + assert operation != null : "No in-progress sequence found for node " + node.nodeId(); + sut.service.commit(new CancelInProgressSequence(node.nodeId())); Transformations steps = oldOperationState.stashedSteps; simulatedState = steps.revertPublishedEffects(simulatedState); switch (oldOperationState.type) @@ -508,11 +509,11 @@ public class MetadataChangeSimulationTest extends CMSTestBase { validatePlacementsFinal(sut, state); sut.close(); - System.out.println(String.format("(RF: %d, CONCURRENCY: %d, MODE: %s, RUN TIME: %dms) - " + + System.out.println(String.format("(RF: %d, CONCURRENCY: %d, RUN TIME: %dms) - " + "REGISTERED: %d, CURRENT SIZE: %d, JOINED: %d, LEFT: %d, " + "REPLACED: %d, MOVED: %s, REJECTED OPS: %d, " + "CANCELLED OPS (join/replace/leave/move): %d/%d/%d/%d, INFLIGHT OPS: %d", - sut.replication.fullReplicas, concurrency, desc, System.currentTimeMillis() - startTime, + sut.replication.fullReplicas, concurrency, System.currentTimeMillis() - startTime, state.uniqueNodes, state.currentNodes.size(), state.joined, state.left, state.replaced, state.moved, state.rejected, state.cancelled[0], state.cancelled[1], state.cancelled[2], state.cancelled[3], @@ -534,12 +535,12 @@ public class MetadataChangeSimulationTest extends CMSTestBase { try { - ClusterMetadata metadata = sut.service.commit(new PrepareJoin(node.nodeId, - ImmutableSet.of(node.token), - sut.service.placementProvider(), - true, - false)); - return Optional.of((BootstrapAndJoin) metadata.inProgressSequences.get(node.nodeId)); + ClusterMetadata metadata = sut.service.commit(new PrepareJoin(node.nodeId(), + ImmutableSet.of(node.longToken()), + sut.service.placementProvider(), + true, + false)); + return Optional.of((BootstrapAndJoin) metadata.inProgressSequences.get(node.nodeId())); } catch (Throwable t) { @@ -551,8 +552,8 @@ public class MetadataChangeSimulationTest extends CMSTestBase { try { - ClusterMetadata metadata = sut.service.commit(new PrepareMove(node.nodeId, Collections.singleton(newToken), sut.service.placementProvider(), false)); - return Optional.of((Move) metadata.inProgressSequences.get(node.nodeId)); + ClusterMetadata metadata = sut.service.commit(new PrepareMove(node.nodeId(), Collections.singleton(newToken), sut.service.placementProvider(), false)); + return Optional.of((Move) metadata.inProgressSequences.get(node.nodeId())); } catch (Throwable t) // TODO: do we really want to catch _every_ exception? { @@ -572,19 +573,19 @@ public class MetadataChangeSimulationTest extends CMSTestBase int rf = sut.replication.fullReplicas; if (simulatedPlacements == null) { - List<PlacementSimulator.Node> orig = Collections.singletonList(new PlacementSimulator.Node(node.token.token, node.name)); + List<Node> orig = Collections.singletonList(node); Transformations stashedSteps = new Transformations(); simulatedPlacements = new SimulatedPlacements(rf, orig, - PlacementSimulator.replicate(orig, rf), - PlacementSimulator.replicate(orig, rf), + replicate(orig, rf), + replicate(orig, rf), Collections.emptyList()); bootstrapOperation = OperationState.newBootstrap(node, iter, stashedSteps); } else { - ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = bootstrap_diffBased(simulatedPlacements, node.name, node.token.token); + ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = bootstrap_diffBased(simulatedPlacements, node.idx(), node.token()); simulatedPlacements = nextSimulated.l; Transformations transformations = nextSimulated.r; // immediately execute the first step of bootstrap transformations, the splitting of existing ranges. This @@ -610,14 +611,14 @@ public class MetadataChangeSimulationTest extends CMSTestBase SimulatedPlacements simulatedPlacements = state.simulatedPlacements; assert simulatedPlacements != null : "Cannot move in an empty cluster"; - ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = move_diffBased(simulatedPlacements, movingNode.name, newToken.token); + ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = move_diffBased(simulatedPlacements, movingNode.idx(), newToken.token); Transformations transformations = nextSimulated.r; simulatedPlacements = nextSimulated.l; // immediately execute the first step of move transformations, the splitting of existing ranges. This // is so that subsequent planned operations base their transformations on the split ranges. simulatedPlacements = transformations.advance(simulatedPlacements); OperationState move = OperationState.newMove(movingNode, - new Node(movingNode.nodeId, movingNode.addr, newToken), + movingNode.withNewToken(newToken.token), iter, transformations); return pair(state.transformer() .addOperation(move) @@ -631,10 +632,10 @@ public class MetadataChangeSimulationTest extends CMSTestBase { try { - ClusterMetadata metadata = sut.service.commit(new PrepareLeave(toRemove.nodeId, - true, - sut.service.placementProvider())); - UnbootstrapAndLeave plan = (UnbootstrapAndLeave) metadata.inProgressSequences.get(toRemove.nodeId); + ClusterMetadata metadata = sut.service.commit(new PrepareLeave(toRemove.nodeId(), + true, + sut.service.placementProvider())); + UnbootstrapAndLeave plan = (UnbootstrapAndLeave) metadata.inProgressSequences.get(toRemove.nodeId()); return Optional.of(plan); } catch (Throwable e) @@ -651,7 +652,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase Iterator<ClusterMetadata> iter = toIter(sut.service, events.startLeave, events.midLeave, events.finishLeave); SimulatedPlacements simulatedPlacements = state.simulatedPlacements; - ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = leave_diffBased(simulatedPlacements, toRemove.token.token); + ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = leave_diffBased(simulatedPlacements, toRemove.token()); simulatedPlacements = nextSimulated.l; // we don't remove from bootstrapped nodes until the finish leave event executes OperationState leaveOperation = OperationState.newDecommission(toRemove, iter, nextSimulated.r); @@ -667,12 +668,12 @@ public class MetadataChangeSimulationTest extends CMSTestBase { try { - ClusterMetadata result = sut.service.commit(new PrepareReplace(toReplace.nodeId, - replacement.nodeId, + ClusterMetadata result = sut.service.commit(new PrepareReplace(toReplace.nodeId(), + replacement.nodeId(), sut.service.placementProvider(), true, false)); - BootstrapAndReplace plan = (BootstrapAndReplace) result.inProgressSequences.get(replacement.nodeId); + BootstrapAndReplace plan = (BootstrapAndReplace) result.inProgressSequences.get(replacement.nodeId()); return Optional.of(plan); } catch (Throwable t) @@ -692,7 +693,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase SimulatedPlacements simulatedPlacements = state.simulatedPlacements; assert simulatedPlacements != null : "Cannot replace in an empty cluster"; OperationState bootstrapOperation; - ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = replace_directly(simulatedPlacements, replaced.token.token, replacement.name); + ModelChecker.Pair<SimulatedPlacements, Transformations> nextSimulated = replace_directly(simulatedPlacements, replaced.token(), replacement.idx()); simulatedPlacements = nextSimulated.l; bootstrapOperation = OperationState.newReplacement(replaced, replacement, iter, nextSimulated.r); return pair(state.transformer() @@ -704,20 +705,20 @@ public class MetadataChangeSimulationTest extends CMSTestBase private ModelChecker.Pair<ModelState, CMSSut> joinWithoutBootstrap(Node node, ModelState state, - CMSSut sut) throws InterruptedException, ExecutionException + CMSSut sut) { - sut.service.commit(new UnsafeJoin(node.nodeId, ImmutableSet.of(node.token), sut.service.placementProvider())); + sut.service.commit(new UnsafeJoin(node.nodeId(), ImmutableSet.of(node.longToken()), sut.service.placementProvider())); - List<PlacementSimulator.Node> nodes = new ArrayList<>(); - nodes.add(new PlacementSimulator.Node(node.token.token, node.name)); + List<Node> nodes = new ArrayList<>(); + nodes.add(node); if (state.simulatedPlacements != null) nodes.addAll(state.simulatedPlacements.nodes); - nodes.sort(PlacementSimulator.Node::compareTo); + nodes.sort(Node::compareTo); int rf = sut.replication.fullReplicas; SimulatedPlacements simulatedState = new SimulatedPlacements(rf, nodes, - PlacementSimulator.replicate(nodes, rf), - PlacementSimulator.replicate(nodes, rf), + replicate(nodes, rf), + replicate(nodes, rf), Collections.emptyList()); return pair(state.transformer() .withJoined(node) @@ -727,19 +728,16 @@ public class MetadataChangeSimulationTest extends CMSTestBase } private ModelChecker.Pair<ModelState, Node> registerNewNode(ModelState state, CMSSut sut, Supplier<LongToken> token) - throws Exception { - return registerNewNode(state, sut, token, "dc1", "rack1"); + return registerNewNode(state, sut, token, 1, 1); } - private ModelChecker.Pair<ModelState, Node> registerNewNode(ModelState state, CMSSut sut, Supplier<LongToken> token, String dc, String rack) - throws Exception + private ModelChecker.Pair<ModelState, Node> registerNewNode(ModelState state, CMSSut sut, Supplier<LongToken> token, int dc, int rack) { ModelState newState = state.transformer().incrementUniqueNodes().transform(); - String nodeName = String.format("127.0.%d.%d", newState.uniqueNodes / 256, newState.uniqueNodes % 256); - InetAddressAndPort addr = InetAddressAndPort.getByAddress(InetAddress.getByName(nodeName)); - ClusterMetadata metadata = sut.service.commit(new Register(ClusterMetadataTestHelper.addr(addr), new Location(dc, rack), NodeVersion.CURRENT)); - return pair(newState, new Node(metadata.directory.peerId(addr), addr, token.get())); + Node node = new Node(token.get().token, newState.uniqueNodes, dc, rack); + sut.service.commit(new Register(new NodeAddresses(node.addr()), new Location(node.dc(), node.rack()), NodeVersion.CURRENT)); + return pair(newState, node); } private Node getRemovalCandidate(ModelState state, ModelChecker.EntropySource entropySource) @@ -777,17 +775,17 @@ public class MetadataChangeSimulationTest extends CMSTestBase return sb.toString(); } - public static void match(PlacementForRange actual, Map<PlacementSimulator.Range, List<PlacementSimulator.Node>> predicted) throws Throwable + public static void match(PlacementForRange actual, Map<PlacementSimulator.Range, List<Node>> predicted) throws Throwable { Map<Range<Token>, EndpointsForRange> groups = actual.replicaGroups(); assert predicted.size() == groups.size() : String.format("\nPredicted:\n%s(%d)" + "\nActual:\n%s(%d)", toString(predicted), predicted.size(), toString(actual.replicaGroups()), groups.size()); - for (Map.Entry<PlacementSimulator.Range, List<PlacementSimulator.Node>> entry : predicted.entrySet()) + for (Map.Entry<PlacementSimulator.Range, List<Node>> entry : predicted.entrySet()) { PlacementSimulator.Range range = entry.getKey(); - List<PlacementSimulator.Node> nodes = entry.getValue(); + List<Node> nodes = entry.getValue(); Range<Token> predictedRange = new Range<Token>(new Murmur3Partitioner.LongToken(range.start), new Murmur3Partitioner.LongToken(range.end)); EndpointsForRange endpointsForRange = groups.get(predictedRange); @@ -798,16 +796,16 @@ public class MetadataChangeSimulationTest extends CMSTestBase "\nActual: %s", range, nodes, endpointsForRange.endpoints()), nodes.size(), endpointsForRange.size()); - for (PlacementSimulator.Node node : nodes) + for (Node node : nodes) { assertTrue(String.format("Endpoints for range %s should have contained %s, but they have not." + "\nExpected: %s" + "\nActual: %s.", endpointsForRange.range(), - node.id, + node.id(), nodes, endpointsForRange.endpoints()), - endpointsForRange.endpoints().contains(InetAddressAndPort.getByAddress(InetAddress.getByName(node.id)))); + endpointsForRange.endpoints().contains(InetAddressAndPort.getByAddress(InetAddress.getByName(node.id())))); } } } @@ -1029,7 +1027,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase currentNodes = new ArrayList<>(); for (Node n : tmp) { - if (n.nodeId.equals(movingNode.nodeId)) + if (n.idx() == movingNode.idx()) currentNodes.add(movedTo); else currentNodes.add(n); @@ -1191,37 +1189,11 @@ public class MetadataChangeSimulationTest extends CMSTestBase } } - public static class Node - { - public final NodeId nodeId; - public final InetAddressAndPort addr; - public final LongToken token; - public final String name; - - public Node(NodeId nodeId, InetAddressAndPort addr, LongToken token) - { - this.nodeId = nodeId; - this.addr = addr; - this.token = token; - this.name = addr.toString(false).replace("/",""); - } - - @Override - public String toString() - { - return "Node{" + - "nodeId=" + nodeId + - ", addr=" + addr + - ", token=" + token + - '}'; - } - } - public static List<Token> toTokens(List<Node> nodes) { List<Token> tokens = new ArrayList<>(); for (Node node : nodes) - tokens.add(node.token); + tokens.add(node.longToken()); tokens.sort(Token::compareTo); return tokens; @@ -1257,7 +1229,7 @@ public class MetadataChangeSimulationTest extends CMSTestBase Set<Token> allTokens = new HashSet<>(toTokens(modelState.currentNodes)); for (OperationState bootstrappedNode : modelState.operationStates) - allTokens.add(bootstrappedNode.nodes[0].token); + allTokens.add(bootstrappedNode.nodes[0].longToken()); List<Range<Token>> expectedRanges = toRanges(allTokens, partitioner); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java index 40de4dd8be..c626440c3f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulator.java @@ -24,22 +24,20 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; +import java.net.UnknownHostException; +import java.util.*; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import org.junit.Assert; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.locator.InetAddressAndPort; +;import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeId; + /** * A small class that helps to avoid doing mental arithmetics on ranges. */ @@ -281,7 +279,7 @@ public class PlacementSimulator /** * Diff-based bootstrap (very close implementation-wise to what production code does) */ - public static ModelChecker.Pair<SimulatedPlacements, Transformations> bootstrap_diffBased(SimulatedPlacements baseState, String node, long token) + public static ModelChecker.Pair<SimulatedPlacements, Transformations> bootstrap_diffBased(SimulatedPlacements baseState, int node, long token) { List<Node> splitNodes = split(baseState.nodes, token); Map<Range, List<Node>> maximalStateWithPlacement = replicate(move(splitNodes, token, node), baseState.rf); @@ -374,10 +372,10 @@ public class PlacementSimulator return new ModelChecker.Pair<>(baseState.withStashed(steps), steps); } - public static ModelChecker.Pair<SimulatedPlacements, Transformations> move_diffBased(SimulatedPlacements baseState, String node, long newToken) + public static ModelChecker.Pair<SimulatedPlacements, Transformations> move_diffBased(SimulatedPlacements baseState, int node, long newToken) { Node oldLocation = baseState.nodes.stream() - .filter(n -> n.id.equals(node)) + .filter(n -> n.idx() == node) .findFirst() .get(); @@ -385,7 +383,7 @@ public class PlacementSimulator List<Node> finalNodes = new ArrayList<>(); for (int i = 0; i < origNodes.size(); i++) { - if (origNodes.get(i).id == oldLocation.id) + if (origNodes.get(i).idx == oldLocation.idx) continue; finalNodes.add(origNodes.get(i)); } @@ -467,7 +465,7 @@ public class PlacementSimulator List<Node> newNodes = new ArrayList<>(); for (int i = 0; i < currentNodes.size(); i++) { - if (currentNodes.get(i).id == oldLocation.id) + if (currentNodes.get(i).idx() == oldLocation.idx()) continue; newNodes.add(currentNodes.get(i)); } @@ -488,7 +486,7 @@ public class PlacementSimulator return new ModelChecker.Pair<>(baseState.withStashed(steps), steps); } - public static SimulatedPlacements bootstrapFully(SimulatedPlacements baseState, String node, long token) + public static SimulatedPlacements bootstrapFully(SimulatedPlacements baseState, int node, long token) { ModelChecker.Pair<SimulatedPlacements, Transformations> a = bootstrap_diffBased(baseState, node, token); baseState = a.l; @@ -583,7 +581,7 @@ public class PlacementSimulator return new ModelChecker.Pair<>(baseState.withStashed(steps), steps); } - public static ModelChecker.Pair<SimulatedPlacements, Transformations> replace_directly(SimulatedPlacements baseState, long token, String newNode) + public static ModelChecker.Pair<SimulatedPlacements, Transformations> replace_directly(SimulatedPlacements baseState, long token, int newNode) { // find the node with the specified token Node toReplace = baseState.nodes.stream() @@ -690,8 +688,8 @@ public class PlacementSimulator // bootstrap_diffBased implementation and the real code doesn't do this, only the endpoint matters for // correctness, so we limit this comparison to endpoints only. Assert.assertEquals(String.format("For key: %s\n", k), - expected.get(k).stream().map(n -> n.id).sorted().collect(Collectors.toList()), - actual.get(k).stream().map(n -> n.id).sorted().collect(Collectors.toList())); + expected.get(k).stream().map(n -> n.idx).sorted().collect(Collectors.toList()), + actual.get(k).stream().map(n -> n.idx).sorted().collect(Collectors.toList())); }); } @@ -898,11 +896,11 @@ public class PlacementSimulator // We're trying to split rightmost range if (previous == null) { - newNodes.add(new Node(splitAt, nodes.get(0).id)); + newNodes.add(new Node(splitAt, nodes.get(0).idx)); } else { - newNodes.add(new Node(splitAt, previous.id)); + newNodes.add(new Node(splitAt, previous.idx)); } inserted = true; } @@ -913,7 +911,7 @@ public class PlacementSimulator // Leftmost is split if (!inserted) - newNodes.add(new Node(splitAt, previous.id)); + newNodes.add(new Node(splitAt, previous.idx)); newNodes.sort(Node::compareTo); return Collections.unmodifiableList(newNodes); @@ -922,7 +920,7 @@ public class PlacementSimulator /** * Change the ownership of the freshly split token */ - public static List<Node> move(List<Node> nodes, long tokenToMove, String newOwner) + public static List<Node> move(List<Node> nodes, long tokenToMove, int newOwner) { List<Node> newNodes = new ArrayList<>(); for (Node node : nodes) @@ -962,7 +960,7 @@ public class PlacementSimulator Map<Range, List<Node>> replication = new TreeMap<>(); for (Range range : ranges) { - Set<String> names = new HashSet<>(); + Set<Integer> names = new HashSet<>(); List<Node> replicas = new ArrayList<>(); int idx = primaryReplica(nodes, range); if (idx >= 0) @@ -988,23 +986,192 @@ public class PlacementSimulator return Collections.unmodifiableMap(replication); } - public static void addIfUnique(List<Node> nodes, Set<String> names, Node node) + public static Map<Range, List<Node>> replicate(List<Node> nodes, Map<String, Integer> rfs) + { + nodes.sort(Comparator.comparing(n -> n.token)); + Map<String, DatacenterNodes> template = new HashMap<>(); + + Map<String, List<Node>> nodesByDC = nodesByDC(nodes); + Map<String, Set<String>> racksByDC = racksByDC(nodes); + + for (Map.Entry<String, Integer> entry : rfs.entrySet()) + { + String dc = entry.getKey(); + int rf = entry.getValue(); + List<Node> nodesInThisDC = nodesByDC.get(dc); + Set<String> racksInThisDC = racksByDC.get(dc); + int nodeCount = nodesInThisDC == null ? 0 : nodesInThisDC.size(); + int rackCount = racksInThisDC == null ? 0 : racksInThisDC.size(); + if (rf <= 0 || nodeCount == 0) + continue; + + template.put(dc, new DatacenterNodes(rf, rackCount, nodeCount)); + } + + List<Range> ranges = toRanges(nodes); + Map<Range, Map<String, List<Node>>> replication = new TreeMap<>(); + + for (Range range : ranges) + { + int idx = primaryReplica(nodes, range); + if (idx >= 0) + { + int dcsToFill = template.size(); + + Map<String, DatacenterNodes> nodesInDCs = new HashMap<>(); + for (Map.Entry<String, DatacenterNodes> e : template.entrySet()) + nodesInDCs.put(e.getKey(), e.getValue().copy()); + + while (dcsToFill > 0) + { + Node node = nodes.get(idx); + DatacenterNodes dcNodes = nodesInDCs.get(node.dc()); + if (dcNodes != null && dcNodes.addAndCheckIfDone(node, new Location(node.dc(), node.rack()))) + dcsToFill--; + + if (idx + 1 == nodes.size()) + idx = 0; + else + idx += 1; + } + + replication.put(range, mapValues(nodesInDCs, v -> v.nodes)); + } + else + { + // if the range end is larger than the highest assigned token, then treat it + // as part of the wraparound and replicate it to the same nodes as the first + // range. This is most likely caused by a decommission removing the node with + // the largest token. + replication.put(range, replication.get(ranges.get(0))); + } + } + + return combine(replication); + } + + private static Map<Range, List<Node>> combine(Map<Range, Map<String, List<Node>>> orig) + { + Map<Range, List<Node>> res = new HashMap<>(); + for (Map.Entry<Range, Map<String, List<Node>>> e : orig.entrySet()) + { + for (List<Node> v : e.getValue().values()) + res.computeIfAbsent(e.getKey(), k -> new ArrayList<>()).addAll(v); + + } + return res; + } + + private static <K, T1, T2> Map<K, T2> mapValues(Map<K, T1> allDCs, Function<T1, T2> map) + { + Map<K, T2> res = new HashMap<>(); + for (Map.Entry<K, T1> e : allDCs.entrySet()) + { + res.put(e.getKey(), map.apply(e.getValue())); + } + return res; + } + + public static Map<String, List<Node>> nodesByDC(List<Node> nodes) + { + Map<String, List<Node>> nodesByDC = new HashMap<>(); + for (Node node : nodes) + nodesByDC.computeIfAbsent(node.dc(), (k) -> new ArrayList<>()).add(node); + + return nodesByDC; + } + + public static Map<String, Set<String>> racksByDC(List<Node> nodes) + { + Map<String, Set<String>> racksByDC = new HashMap<>(); + for (Node node : nodes) + racksByDC.computeIfAbsent(node.dc(), (k) -> new HashSet<>()).add(node.rack()); + + return racksByDC; + } + + private static final class DatacenterNodes + { + private final List<Node> nodes = new ArrayList<>(); + private final Set<Location> racks = new HashSet<>(); + + /** Number of replicas left to fill from this DC. */ + int rfLeft; + int acceptableRackRepeats; + + public DatacenterNodes copy() + { + return new DatacenterNodes(rfLeft, acceptableRackRepeats); + } + + DatacenterNodes(int rf, + int rackCount, + int nodeCount) + { + this.rfLeft = Math.min(rf, nodeCount); + acceptableRackRepeats = rf - rackCount; + } + + // for copying + DatacenterNodes(int rfLeft, int acceptableRackRepeats) + { + this.rfLeft = rfLeft; + this.acceptableRackRepeats = acceptableRackRepeats; + } + + boolean addAndCheckIfDone(Node node, Location location) + { + if (done()) + return false; + + if (nodes.contains(node)) + // Cannot repeat a node. + return false; + + if (racks.add(location)) + { + // New rack. + --rfLeft; + nodes.add(node); + return done(); + } + if (acceptableRackRepeats <= 0) + // There must be rfLeft distinct racks left, do not add any more rack repeats. + return false; + + nodes.add(node); + + // Added a node that is from an already met rack to match RF when there aren't enough racks. + --acceptableRackRepeats; + --rfLeft; + return done(); + } + + boolean done() + { + assert rfLeft >= 0; + return rfLeft == 0; + } + } + + + public static void addIfUnique(List<Node> nodes, Set<Integer> names, Node node) { - if (names.contains(node.id)) + if (names.contains(node.idx())) return; nodes.add(node); - names.add(node.id); + names.add(node.idx()); } public static List<Node> uniq(List<Node> nodes) { List<Node> newNodes = new ArrayList<>(); - Set<String> ids = new HashSet<>(); + Set<Integer> ids = new HashSet<>(); for (Node node : nodes) { - if (!ids.contains(node.id)) + if (!ids.contains(node.idx)) { - ids.add(node.id); + ids.add(node.idx); newNodes.add(node); } } @@ -1138,15 +1305,120 @@ public class PlacementSimulator } } + public interface Lookup + { + String id(int idx); + String dc(int idx); + String rack(int idx); + NodeId nodeId(int idx); + InetAddressAndPort addr(int idx); + } + + public static class DefaultLookup implements Lookup + { + public String id(int idx) + { + return String.format("127.0.%d.%d", idx / 256, idx % 256); + } + + public NodeId nodeId(int idx) + { + return ClusterMetadata.current().directory.peerId(addr(idx)); + } + + public InetAddressAndPort addr(int idx) + { + try + { + return InetAddressAndPort.getByName(id(idx)); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } + + public String dc(int dcIdx) + { + return String.format("datacenter%d", dcIdx); + } + + public String rack(int rackIdx) + { + return String.format("rack%d", rackIdx); + } + } + + public static Lookup DEFAULT_LOOKUP = new DefaultLookup(); public static class Node implements Comparable<Node> { - public final long token; - public final String id; + private final long token; + private final int idx; + private final int dc; + private final int rack; + private final Lookup lookup; - public Node(long token, String id) + public Node(long token, int idx) + { + this(token, idx, 1, 1, DEFAULT_LOOKUP); + } + + public Node(long token, int idx, int dc, int rack) + { + this(token, idx, dc, rack, DEFAULT_LOOKUP); + } + public Node(long token, int idx, int dc, int rack, Lookup lookup) { this.token = token; - this.id = id; + this.idx = idx; + this.dc = dc; + this.rack = rack; + this.lookup = lookup; + } + + public InetAddressAndPort addr() + { + return lookup.addr(idx); + } + + public NodeId nodeId() + { + return lookup.nodeId(idx); + } + + public String id() + { + return lookup.id(idx); + } + + public int idx() + { + return idx; + } + + public String dc() + { + return lookup.dc(dc); + } + + public String rack() + { + return lookup.rack(rack); + } + + public long token() + { + return token; + } + + public Murmur3Partitioner.LongToken longToken() + { + return new Murmur3Partitioner.LongToken(token); + } + + public Node withNewToken(long newToken) + { + return new Node(newToken, idx, dc, rack, lookup); } public boolean equals(Object o) @@ -1154,12 +1426,12 @@ public class PlacementSimulator if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Node node = (Node) o; - return Objects.equals(id, node.id); + return Objects.equals(idx, node.idx); } public int hashCode() { - return Objects.hash(id); + return Objects.hash(idx); } public int compareTo(Node o) @@ -1169,7 +1441,7 @@ public class PlacementSimulator public String toString() { - return "" + id + "@" + token; + return "" + idx + "@" + token; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulatorTest.java index 6c3823ff50..07a0a35019 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulatorTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/PlacementSimulatorTest.java @@ -60,7 +60,7 @@ public class PlacementSimulatorTest orig.sort(Node::compareTo); SimulatedPlacements placements = new SimulatedPlacements(rf, orig, replicate(orig, rf), replicate(orig, rf), Collections.emptyList()); - ModelChecker.Pair<SimulatedPlacements, Transformations> steps = move_diffBased(placements, "127.0.0.1", newToken); + ModelChecker.Pair<SimulatedPlacements, Transformations> steps = move_diffBased(placements, 1, newToken); List<Node> afterSplit = split(orig, newToken); List<Node> finalState = moveFinalState(orig, movingNode, newToken); @@ -71,13 +71,13 @@ public class PlacementSimulatorTest assertPlacements(placements, replicate(afterSplit, rf), superset(replicate(afterSplit, rf), - replicate(split(finalState, movingNode.token), rf))); + replicate(split(finalState, movingNode.token()), rf))); placements = steps.r.advance(placements); assertPlacements(placements, - replicate(split(finalState, movingNode.token), rf), + replicate(split(finalState, movingNode.token()), rf), superset(replicate(afterSplit, rf), - replicate(split(finalState, movingNode.token), rf))); + replicate(split(finalState, movingNode.token()), rf))); placements = steps.r.advance(placements); assertPlacements(placements, @@ -108,7 +108,7 @@ public class PlacementSimulatorTest Node newNode = n(5, newToken); SimulatedPlacements placements = new SimulatedPlacements(rf, orig, replicate(orig, rf), replicate(orig, rf), Collections.emptyList()); - ModelChecker.Pair<SimulatedPlacements, Transformations> steps = bootstrap_diffBased(placements, "127.0.0.5", newToken); + ModelChecker.Pair<SimulatedPlacements, Transformations> steps = bootstrap_diffBased(placements, 5, newToken); List<Node> afterSplit = split(orig, newToken); List<Node> finalState = bootstrapFinalState(orig, newNode, newToken); @@ -158,22 +158,22 @@ public class PlacementSimulatorTest orig.sort(Node::compareTo); SimulatedPlacements placements = new SimulatedPlacements(rf, orig, replicate(orig, rf), replicate(orig, rf), Collections.emptyList()); - ModelChecker.Pair<SimulatedPlacements, Transformations> steps = leave_diffBased(placements, leavingNode.token); + ModelChecker.Pair<SimulatedPlacements, Transformations> steps = leave_diffBased(placements, leavingNode.token()); - List<Node> finalState = leaveFinalState(orig, leavingNode.token); + List<Node> finalState = leaveFinalState(orig, leavingNode.token()); //TODO: for some reason diff-based leave is a 3 step operation placements = steps.r.advance(placements); assertPlacements(placements, replicate(orig, rf), superset(replicate(orig, rf), - replicate(split(finalState, leavingNode.token), rf))); + replicate(split(finalState, leavingNode.token()), rf))); placements = steps.r.advance(placements); assertPlacements(placements, - replicate(split(finalState, leavingNode.token), rf), + replicate(split(finalState, leavingNode.token()), rf), superset(replicate(orig, rf), - replicate(split(finalState, leavingNode.token), rf))); + replicate(split(finalState, leavingNode.token()), rf))); placements = steps.r.advance(placements); assertPlacements(placements, @@ -183,29 +183,29 @@ public class PlacementSimulatorTest public static List<Node> moveFinalState(List<Node> nodes, Node target, long newToken) { - nodes = filter(nodes, n -> !n.id.equals(target.id)); // filter out current owner + nodes = filter(nodes, n -> n.idx() != target.idx()); // filter out current owner nodes = split(nodes, newToken); // materialize new token - nodes = move(nodes, newToken, target.id); // move new token to the node + nodes = move(nodes, newToken, target.idx()); // move new token to the node return nodes; } public static List<Node> bootstrapFinalState(List<Node> nodes, Node newNode, long newToken) { - nodes = split(nodes, newToken); // materialize new token - nodes = move(nodes, newToken, newNode.id); // move new token to the node + nodes = split(nodes, newToken); // materialize new token + nodes = move(nodes, newToken, newNode.idx()); // move new token to the node return nodes; } public static List<Node> leaveFinalState(List<Node> nodes, long leavingToken) { - nodes = filter(nodes, n -> n.token != leavingToken); + nodes = filter(nodes, n -> n.token() != leavingToken); return nodes; } - public static PlacementSimulator.Node n(int id, long token) + public static PlacementSimulator.Node n(int idx, long token) { - return new PlacementSimulator.Node(token, "127.0.0." + id); + return new PlacementSimulator.Node(token, idx); } @Test @@ -221,7 +221,7 @@ public class PlacementSimulatorTest { List<Long> source = readableTokens(100); Iterator<Long> tokens = source.iterator(); - List<Node> orig = Collections.singletonList(new Node(tokens.next(), "127.0.0.1")); + List<Node> orig = Collections.singletonList(new Node(tokens.next(), 1)); ModelChecker<SimulatedPlacements, SUTState> modelChecker = new ModelChecker<>(); AtomicInteger addressCounter = new AtomicInteger(1); @@ -231,7 +231,7 @@ public class PlacementSimulatorTest new SUTState()) .step((state, sut) -> state.nodes.size() < rf, (state, sut, rng) -> new ModelChecker.Pair<>(bootstrapFully(state, - "127.0.0." + addressCounter.incrementAndGet(), + addressCounter.incrementAndGet(), tokens.next()), sut)) .step((state, sut) -> state.nodes.size() >= rf && state.stashedStates.size() < 1, @@ -241,14 +241,14 @@ public class PlacementSimulatorTest // randomly schedule either decommission or replacement of an existing node Node toRemove = state.nodes.get(rng.nextInt(0, state.nodes.size() - 1)); return rng.nextBoolean() - ? new ModelChecker.Pair<>(replace_directly(state, toRemove.token, "127.0.0." + addressCounter.incrementAndGet()).l, sut) - : new ModelChecker.Pair<>(leave_diffBased(state, toRemove.token).l, sut); + ? new ModelChecker.Pair<>(replace_directly(state, toRemove.token(), addressCounter.incrementAndGet()).l, sut) + : new ModelChecker.Pair<>(leave_diffBased(state, toRemove.token()).l, sut); } else { // schedule bootstrapping an additional node return new ModelChecker.Pair<>(bootstrap_diffBased(state, - "127.0.0." + addressCounter.incrementAndGet(), + addressCounter.incrementAndGet(), tokens.next()).l, sut); } @@ -283,7 +283,7 @@ public class PlacementSimulatorTest Iterator<Long> tokens = source.iterator(); List<Node> nodes = nodes(10, tokens); long nextToken = tokens.next(); - String newNode = "127.0.0." + nodes.size() + 1; + int newNode = nodes.size() + 1; SimulatedPlacements sim = new SimulatedPlacements(rf, nodes, replicate(nodes, rf), replicate(nodes, rf), Collections.emptyList()); revertPartiallyCompleteOp(sim, () -> bootstrap_diffBased(sim, newNode, nextToken), 3); } @@ -299,7 +299,7 @@ public class PlacementSimulatorTest List<Node> nodes = nodes(10, tokens); Node toRemove = nodes.get(5); SimulatedPlacements sim = new SimulatedPlacements(rf, nodes, replicate(nodes, rf), replicate(nodes, rf), Collections.emptyList()); - revertPartiallyCompleteOp(sim, () -> leave_diffBased(sim, toRemove.token), 2); + revertPartiallyCompleteOp(sim, () -> leave_diffBased(sim, toRemove.token()), 2); } } @@ -313,15 +313,16 @@ public class PlacementSimulatorTest List<Node> nodes = nodes(10, tokens); Node toReplace = nodes.get(5); SimulatedPlacements sim = new SimulatedPlacements(rf, nodes, replicate(nodes, rf), replicate(nodes, rf), Collections.emptyList()); - revertPartiallyCompleteOp(sim, () -> replace_directly(sim, toReplace.token, "127.0.0.99"), 2); + revertPartiallyCompleteOp(sim, () -> replace_directly(sim, toReplace.token(), 99), 2); } } + private List<Node> nodes(int n, Iterator<Long> tokens) { List<Node> nodes = new ArrayList<>(); for (int i = 0; i < n; i++) - nodes.add(new Node(tokens.next(), "127.0.0." + (i+1))); + nodes.add(new Node(tokens.next(), i+1)); nodes.sort(Node::compareTo); return nodes; } @@ -347,7 +348,7 @@ public class PlacementSimulatorTest { Map<Range, List<Node>> startingReadPlacements = sim.readPlacements; Map<Range, List<Node>> startingWritePlacements = sim.writePlacements; - ModelChecker.Pair<SimulatedPlacements, Transformations> op = opProvider.get();; + ModelChecker.Pair<SimulatedPlacements, Transformations> op = opProvider.get(); sim = op.l; Transformations steps = op.r; // execute the required steps diff --git a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementIntegrationTest.java b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementIntegrationTest.java new file mode 100644 index 0000000000..7036261e55 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementIntegrationTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ExecutionException; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.distributed.test.log.CMSTestBase; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.distributed.test.log.MetadataChangeSimulationTest; +import org.apache.cassandra.distributed.test.log.PlacementSimulator; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.AtomicLongBackedProcessor; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; + +public class UniformRangePlacementIntegrationTest +{ + @BeforeClass + public static void beforeClass() + { + SchemaLoader.prepareServerNoRegister(); + } + + @Before + public void before() throws ExecutionException, InterruptedException + { + ClusterMetadataService.unsetInstance(); + new CMSTestBase.CMSSut(AtomicLongBackedProcessor::new, false, 3); + } + + @Before + public void after() throws ExecutionException, InterruptedException + { + ClusterMetadataService.unsetInstance(); + } + + @Test + public void testMultiDC() throws Throwable + { + UniformRangePlacement rangePlacement = new UniformRangePlacement(); + Random rng = new Random(1); + int idx = 1; + List<PlacementSimulator.Node> nodes = new ArrayList<>(); + for (int i = 0; i < 5; i++) + { + for (int j = 1; j <= 3; j++) + { + long token = rng.nextLong(); + int dc = j; + int rack = (rng.nextInt(3) + 1); + PlacementSimulator.Node node = new PlacementSimulator.Node(token, idx, dc, rack); + ClusterMetadataTestHelper.register(idx, node.dc(), node.rack()); + ClusterMetadataTestHelper.join(idx, token); + nodes.add(node); + idx++; + } + } + + ClusterMetadataService.instance().replayAndWait(); + DataPlacements placements = rangePlacement.calculatePlacements(ClusterMetadata.current(), + Keyspaces.of(ClusterMetadata.current().schema.getKeyspaces().get("test_nts").get())); + Map<String, Integer> rf = new HashMap<>(); + for (int i = 1; i <= 3; i++) + { + String dc = "datacenter" + i; + rf.put(dc, 3); + } + + Map<PlacementSimulator.Range, List<PlacementSimulator.Node>> predicted = PlacementSimulator.replicate(nodes, rf); + + ReplicationParams replicationParams = ClusterMetadata.current().schema.getKeyspaces().get("test_nts").get().params.replication; + MetadataChangeSimulationTest.match(placements.get(replicationParams).reads, + predicted); + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
