This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7a888149df Support topology-safe changes to Datacenter & Rack for live 
nodes
7a888149df is described below

commit 7a888149dff4afaea8753571097bd8bca6a4fbfd
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Jan 12 13:00:00 2024 +0000

    Support topology-safe changes to Datacenter & Rack for live nodes
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
    CASSANDRA-20528
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/SystemKeyspace.java    |   7 +
 .../apache/cassandra/service/StorageService.java   |  18 ++
 .../cassandra/service/StorageServiceMBean.java     |   2 +
 .../org/apache/cassandra/tcm/Transformation.java   |   3 +-
 .../tcm/listeners/LegacyStateListener.java         |   7 +-
 .../tcm/listeners/PlacementsChangeListener.java    |   2 +-
 .../apache/cassandra/tcm/membership/Directory.java |  25 +-
 .../apache/cassandra/tcm/membership/Location.java  |  12 +
 .../cassandra/tcm/ownership/DataPlacement.java     |   5 +
 .../cassandra/tcm/ownership/DataPlacements.java    |  18 ++
 .../cassandra/tcm/ownership/ReplicaGroups.java     |  15 ++
 .../apache/cassandra/tcm/ownership/TokenMap.java   |   4 +-
 .../tcm/ownership/VersionedEndpoints.java          |   7 +-
 .../tcm/sequences/CancelCMSReconfiguration.java    |   2 +-
 .../cassandra/tcm/transformations/AlterSchema.java |   2 +-
 .../tcm/transformations/AlterTopology.java         | 193 +++++++++++++++
 src/java/org/apache/cassandra/tools/NodeTool.java  |   1 +
 .../cassandra/tools/nodetool/AlterTopology.java    |  49 ++++
 .../distributed/test/log/AlterTopologyTest.java    | 267 +++++++++++++++++++++
 .../test/log/MetadataChangeSimulationTest.java     |   4 +-
 .../cassandra/distributed/test/log/ModelState.java |  14 ++
 .../distributed/test/log/ReconfigureCMSTest.java   |   4 +-
 .../distributed/test/log/SimulatedOperation.java   |  21 ++
 .../cassandra/harry/model/TokenPlacementModel.java |  17 ++
 .../SchemaChangeDuringRangeMovementTest.java       |  48 +---
 .../service/AlterTopologyArgParsingTest.java       | 149 ++++++++++++
 .../cassandra/tcm/membership/DirectoryTest.java    | 126 ++++++++++
 .../InProgressSequenceCancellationTest.java        |  36 +--
 .../cassandra/tcm/sequences/SequencesUtils.java    |  72 ++++++
 30 files changed, 1036 insertions(+), 95 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2fb55b032e..26eb4a062c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Support topology-safe changes to Datacenter & Rack for live nodes 
(CASSANDRA-20528)
  * Add SSTableIntervalTree latency metric (CASSANDRA-20502)
  * Ignore repetitions of semicolon in CQLSH (CASSANDRA-19956)
  * Avoid NPE during cms initialization abort (CASSANDRA-20527)
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 81186c512a..36fc953360 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -975,6 +975,13 @@ public final class SystemKeyspace
         executeInternal(format(req, LOCAL, LOCAL), rack);
     }
 
+    public static synchronized void updateLocation(Location location)
+    {
+        String req = "INSERT INTO system.%s (key, data_center, rack) VALUES 
('%s', ?, ?)";
+        executeInternal(format(req, LOCAL, LOCAL), location.datacenter, 
location.rack);
+        forceBlockingFlush(LOCAL);
+    }
+
     public static Set<String> tokensAsSet(Collection<Token> tokens)
     {
         if (tokens.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index e8a3b40a86..bc914a9e55 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -186,6 +186,7 @@ import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.compatibility.GossipHelper;
 import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
@@ -199,6 +200,7 @@ import 
org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
 import org.apache.cassandra.tcm.transformations.Assassinate;
 import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.Unregister;
@@ -5553,4 +5555,20 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     {
         return DatabaseDescriptor.getPaxosRepairRaceWait();
     }
+
+    public void alterTopology(String changes)
+    {
+        Map<NodeId, Location> updates = AlterTopology.parseArgs(changes, 
ClusterMetadata.current().directory);
+        logger.info("Received request to modify rack assignments. Proposed 
changes: {}", updates);
+        if (updates.isEmpty())
+            return;
+
+        AlterTopology transform = new AlterTopology(updates, 
ClusterMetadataService.instance().placementProvider());
+        ClusterMetadataService.instance()
+                              .commit(transform,
+                                      m -> { logger.info("Rack changes 
committed successfully"); return m; },
+                                      (c, r) -> {
+                                          throw new 
IllegalArgumentException("Unable to commit rack changes: " + r);
+                                      });
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index b738ecd486..74017b2579 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1360,4 +1360,6 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     void setPaxosRepairRaceWait(boolean paxosRepairCoordinatorWait);
 
     boolean getPaxosRepairRaceWait();
+    // Comma delimited list of "nodeId=dc:rack" or "endpoint=dc:rack"
+    void alterTopology(String updates);
 }
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index 8cfda01e26..864d9a5d94 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -219,7 +219,8 @@ public interface Transformation
         PREPARE_SIMPLE_CMS_RECONFIGURATION(31, () -> 
PrepareCMSReconfiguration.Simple.serializer),
         PREPARE_COMPLEX_CMS_RECONFIGURATION(32, () -> 
PrepareCMSReconfiguration.Complex.serializer),
         ADVANCE_CMS_RECONFIGURATION(33, () -> 
AdvanceCMSReconfiguration.serializer),
-        CANCEL_CMS_RECONFIGURATION(34, () -> 
CancelCMSReconfiguration.serializer)
+        CANCEL_CMS_RECONFIGURATION(34, () -> 
CancelCMSReconfiguration.serializer),
+        ALTER_TOPOLOGY(35, () -> AlterTopology.serializer),
         ;
 
         private final Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer;
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
index f0fced7ae6..798583a533 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
@@ -115,6 +115,9 @@ public class LegacyStateListener implements 
ChangeListener.Async
                 // state for the local node.
                 
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
                 Gossiper.instance.addLocalApplicationState(SCHEMA, 
StorageService.instance.valueFactory.schema(next.schema.getVersion()));
+                // if the local node's location has changed, update 
system.local.
+                if 
(!next.directory.location(change).equals(prev.directory.location(change)))
+                    
SystemKeyspace.updateLocation(next.directory.location(change));
             }
 
             if (next.directory.peerState(change) == REGISTERED)
@@ -181,6 +184,8 @@ public class LegacyStateListener implements 
ChangeListener.Async
     {
         return prev.peerState(nodeId) != next.peerState(nodeId) ||
                !Objects.equals(prev.getNodeAddresses(nodeId), 
next.getNodeAddresses(nodeId)) ||
-               !Objects.equals(prev.version(nodeId), next.version(nodeId));
+               !Objects.equals(prev.version(nodeId), next.version(nodeId)) ||
+               !Objects.equals(prev.location(nodeId), next.location(nodeId));
+
     }
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
index 80da4ec844..605b526378 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
@@ -34,7 +34,7 @@ public class PlacementsChangeListener implements 
ChangeListener
     private boolean shouldInvalidate(ClusterMetadata prev, ClusterMetadata 
next)
     {
         if 
(!prev.placements.lastModified().equals(next.placements.lastModified()) &&
-            !prev.placements.equals(next.placements)) // <- todo should we 
update lastModified if the result is the same?
+            !prev.placements.equivalentTo(next.placements)) // <- todo should 
we update lastModified if the result is the same?
             return true;
 
         if (prev.schema.getKeyspaces().size() != 
next.schema.getKeyspaces().size())
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 51ab84c520..436ded0ab3 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -237,7 +237,6 @@ public class Directory implements MetadataValue<Directory>
     {
         InetAddressAndPort endpoint = peers.get(id);
         Location location = locations.get(id);
-
         BTreeMultimap<String, InetAddressAndPort> rackEP = 
(BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter);
         if (rackEP == null)
             rackEP = BTreeMultimap.empty();
@@ -268,6 +267,26 @@ public class Directory implements MetadataValue<Directory>
                              newRacksByDC);
     }
 
+    public Directory withUpdatedRackAndDc(NodeId id, Location location)
+    {
+        if (!peers.containsKey(id))
+            throw new IllegalArgumentException(String.format("Node %s has no 
registered location to update", id));
+
+        return withoutRackAndDC(id).withLocation(id, 
location).withRackAndDC(id);
+    }
+
+    private Directory withLocation(NodeId id, Location location)
+    {
+        if (!locations.containsKey(id))
+            throw new IllegalArgumentException(String.format("Node %s has no 
registered location to update", id));
+
+        if (locations.get(id).equals(location))
+            return this;
+
+        return new Directory(nextId, lastModified, peers, 
locations.withForce(id, location), states, versions, hostIds,
+                             addresses, endpointsByDC, racksByDC);
+    }
+
     public Directory without(NodeId id)
     {
         InetAddressAndPort endpoint = peers.get(id);
@@ -665,7 +684,7 @@ public class Directory implements MetadataValue<Directory>
         Directory directory = (Directory) o;
 
         return Objects.equals(lastModified, directory.lastModified) &&
-               isEquivalent(directory);
+               equivalentTo(directory);
     }
 
     private static Pair<NodeVersion, NodeVersion> 
minMaxVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, 
NodeVersion> versions)
@@ -700,7 +719,7 @@ public class Directory implements MetadataValue<Directory>
      * does not check equality of lastModified
      */
     @VisibleForTesting
-    public boolean isEquivalent(Directory directory)
+    public boolean equivalentTo(Directory directory)
     {
         return nextId == directory.nextId &&
                Objects.equals(peers, directory.peers) &&
diff --git a/src/java/org/apache/cassandra/tcm/membership/Location.java 
b/src/java/org/apache/cassandra/tcm/membership/Location.java
index faf8230d94..08ad29dde9 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Location.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Location.java
@@ -67,6 +67,18 @@ public class Location
         return datacenter + '/' + rack;
     }
 
+    public static Location fromString(String value)
+    {
+        if (value == null || value.isEmpty())
+            return null;
+
+        String[] parts = value.split(":");
+        if (parts.length < 2)
+            throw new IllegalArgumentException("Invalid datacenter:rack -  " + 
value);
+        else
+            return new Location(parts[0].trim(), parts[1].trim());
+    }
+
     public static class Serializer implements MetadataSerializer<Location>
     {
         public void serialize(Location t, DataOutputPlus out, Version version) 
throws IOException
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java 
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
index f42d8b5479..12920d6862 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
@@ -184,6 +184,11 @@ public class DataPlacement
         return Objects.hash(reads, writes);
     }
 
+    public boolean equivalentTo(DataPlacement other)
+    {
+        return reads.equivalentTo(other.reads) && 
writes.equivalentTo(other.writes);
+    }
+
     public static class Serializer implements MetadataSerializer<DataPlacement>
     {
         private final IPartitioner partitioner;
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java 
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
index 988d2b1bcb..b89ecde7d3 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
@@ -141,6 +141,24 @@ public class DataPlacements extends 
ReplicationMap<DataPlacement> implements Met
                '}';
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof DataPlacements)) return false;
+        DataPlacements that = (DataPlacements) o;
+        return this.map.equals(that.map);
+    }
+
+    public boolean equivalentTo(DataPlacements other)
+    {
+        if (!map.keySet().equals(other.map.keySet()))
+            return false;
+        return map.entrySet()
+                  .stream()
+                  .allMatch(e -> 
e.getValue().equivalentTo(other.get(e.getKey())));
+    }
+
     public static DataPlacements sortReplicaGroups(DataPlacements placements, 
Comparator<Replica> comparator)
     {
         Builder builder = DataPlacements.builder(placements.size());
diff --git a/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java 
b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
index adc26ff820..33d160fa10 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
@@ -534,4 +534,19 @@ public class ReplicaGroups
     {
         return Objects.hash(ranges, endpoints);
     }
+
+    public boolean equivalentTo(ReplicaGroups other)
+    {
+        if (!ranges.equals(other.ranges))
+            return false;
+
+        for (int i = 0; i < ranges.size(); i++)
+        {
+            EndpointsForRange e1 = endpoints.get(i).get();
+            EndpointsForRange e2 = other.forRange(ranges.get(i)).get();
+            if (e1.size() != e2.size() || !e1.stream().allMatch(e2::contains))
+                return false;
+        }
+        return true;
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java 
b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
index c32f6c351c..c17d91ae48 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
@@ -255,7 +255,7 @@ public class TokenMap implements MetadataValue<TokenMap>
         if (!(o instanceof TokenMap)) return false;
         TokenMap tokenMap = (TokenMap) o;
         return Objects.equals(lastModified, tokenMap.lastModified) &&
-               isEquivalent(tokenMap);
+               equivalentTo(tokenMap);
     }
 
     @Override
@@ -269,7 +269,7 @@ public class TokenMap implements MetadataValue<TokenMap>
      *
      * does not check equality of lastModified
      */
-    public boolean isEquivalent(TokenMap tokenMap)
+    public boolean equivalentTo(TokenMap tokenMap)
     {
         return Objects.equals(map, tokenMap.map) &&
                Objects.equals(partitioner, tokenMap.partitioner);
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java 
b/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
index 90148f2c83..2f429138d2 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
@@ -116,7 +116,9 @@ public interface VersionedEndpoints<E extends Endpoints<E>> 
extends MetadataValu
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             ForRange forRange = (ForRange) o;
-            return 
Objects.equals(endpointsForRange.sorted(Replica::compareTo), 
forRange.endpointsForRange.sorted(Replica::compareTo));
+            return lastModified.equals(forRange.lastModified) &&
+                   Objects.equals(endpointsForRange.sorted(Replica::compareTo),
+                                  
forRange.endpointsForRange.sorted(Replica::compareTo));
         }
 
         public boolean isEmpty()
@@ -184,7 +186,8 @@ public interface VersionedEndpoints<E extends Endpoints<E>> 
extends MetadataValu
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             ForToken forToken = (ForToken) o;
-            return Objects.equals(endpointsForToken, 
forToken.endpointsForToken);
+            return lastModified.equals(forToken.lastModified) &&
+                   Objects.equals(endpointsForToken, 
forToken.endpointsForToken);
         }
 
         public boolean isEmpty()
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java 
b/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
index 3d6499f61b..665c6ec797 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
@@ -77,7 +77,7 @@ public class CancelCMSReconfiguration implements 
Transformation
                                  .withoutWriteReplica(prev.nextEpoch(), 
pendingReplica)
                                  .build();
         }
-        if (!placement.reads.equals(placement.writes))
+        if (!placement.reads.equivalentTo(placement.writes))
             return new Rejected(ExceptionCode.INVALID, 
String.format("Placements will be inconsistent if this transformation is 
applied:\nReads %s\nWrites: %s",
                                                                      
placement.reads,
                                                                      
placement.writes));
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java 
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index cec1d42ca8..faa86e357b 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -224,7 +224,7 @@ public class AlterSchema implements Transformation
             calculatedPlacements.forEach((params, newPlacement) -> {
                 DataPlacement previousPlacement = prev.placements.get(params);
                 // Preserve placement versioning that has resulted from 
natural application where possible
-                if (previousPlacement.equals(newPlacement))
+                if (previousPlacement.equivalentTo(newPlacement))
                     newPlacementsBuilder.with(params, previousPlacement);
                 else
                     newPlacementsBuilder.with(params, newPlacement);
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java 
b/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java
new file mode 100644
index 0000000000..e247f66a88
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
+
+public class AlterTopology implements Transformation
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AlterTopology.class);
+    public static final Serializer serializer = new Serializer();
+
+    private final Map<NodeId, Location> updates;
+    private final PlacementProvider placementProvider;
+
+    public AlterTopology(Map<NodeId, Location> updates, PlacementProvider 
placementProvider)
+    {
+        this.updates = updates;
+        this.placementProvider = placementProvider;
+    }
+
+    public static Map<NodeId, Location> parseArgs(String args, Directory 
directory)
+    {
+        Map<NodeId, Location> asMap = new HashMap<>();
+        for (String change : args.split(","))
+        {
+            String[] parts = change.trim().split("=");
+            if (parts.length != 2)
+                throw new IllegalArgumentException("Invalid specification: " + 
change);
+
+            if (parts[0].isEmpty() || parts[1].isEmpty())
+                throw new IllegalArgumentException("Invalid specification: " + 
change);
+
+            NodeId id = getNodeIdFromString(parts[0].trim(), directory);
+            if (asMap.containsKey(id))
+                throw new IllegalArgumentException("Multiple updates for node 
" + id + " (" + parts[0].trim() + " )");
+            asMap.put(getNodeIdFromString(parts[0].trim(), directory), 
Location.fromString(parts[1].trim()));
+        }
+        return asMap;
+    }
+
+    private static NodeId getNodeIdFromString(String s, Directory directory)
+    {
+        // first try to parse the id as a node id, either in UUID or int form
+        try
+        {
+            return NodeId.fromString(s);
+        }
+        catch (Exception e)
+        {
+            // fall back to trying the supplied id as an endpoint
+            try
+            {
+                InetAddressAndPort endpoint = InetAddressAndPort.getByName(s);
+                return directory.peerId(endpoint);
+            }
+            catch (UnknownHostException u)
+            {
+                throw new IllegalArgumentException("Invalid node identifier 
supplied: " + s);
+            }
+
+        }
+    }
+
+    @Override
+    public Kind kind()
+    {
+        return Kind.ALTER_TOPOLOGY;
+    }
+
+    @Override
+    public Result execute(ClusterMetadata prev)
+    {
+        // Check no inflight range movements
+        if (!prev.lockedRanges.locked.isEmpty())
+            return new Rejected(INVALID, "The requested topology changes 
cannot be executed while there are ongoing range movements.");
+
+        Directory dir = prev.directory;
+        // Check all node ids are present
+        Set<NodeId> missing = updates.keySet()
+                                     .stream()
+                                     .filter(location -> (null == 
dir.location(location)))
+                                     .collect(Collectors.toSet());
+        if (!missing.isEmpty())
+            return new Rejected(INVALID, String.format("Some updates specify 
an unregistered node: %s", missing));
+
+        // Validate there will be no change to placements
+        Directory updated = prev.directory;
+        for (Map.Entry<NodeId, Location> update : updates.entrySet())
+            updated = updated.withUpdatedRackAndDc(update.getKey(), 
update.getValue());
+        ClusterMetadata proposed = 
prev.transformer().with(updated).build().metadata;
+        DataPlacements proposedPlacements = 
placementProvider.calculatePlacements(prev.placements.lastModified(),
+                                                                               
   proposed.tokenMap.toRanges(),
+                                                                               
   proposed,
+                                                                               
   proposed.schema.getKeyspaces());
+        if (!proposedPlacements.equivalentTo(prev.placements))
+        {
+            logger.info("Rejecting topology modifications which would 
materially change data placements: {}", updates);
+            return new Rejected(INVALID, "Proposed updates modify data 
placements, violating consistency guarantees");
+        }
+
+        ClusterMetadata.Transformer next = prev.transformer().with(updated);
+        return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
+    }
+
+
+    @Override
+    public String toString()
+    {
+        return "AlterTopology{" +
+               "updates=" + updates +
+               '}';
+    }
+
+    static class Serializer implements 
AsymmetricMetadataSerializer<Transformation, AlterTopology>
+    {
+        public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            assert t instanceof AlterTopology;
+            AlterTopology alterTopology = (AlterTopology)t;
+            int size = alterTopology.updates.size();
+            out.writeInt(size);
+            for (Map.Entry<NodeId, Location> entry : 
alterTopology.updates.entrySet())
+            {
+                NodeId.serializer.serialize(entry.getKey(), out, version);
+                Location.serializer.serialize(entry.getValue(), out, version);
+            }
+        }
+
+        public AlterTopology deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            int size = in.readInt();
+            Map<NodeId, Location> updates = new HashMap<>(size);
+            for (int i = 0; i < size; i++)
+                updates.put(NodeId.serializer.deserialize(in, version), 
Location.serializer.deserialize(in, version));
+            return new AlterTopology(updates, 
ClusterMetadataService.instance().placementProvider());
+        }
+
+        public long serializedSize(Transformation t, Version version)
+        {
+            assert t instanceof AlterTopology;
+            AlterTopology alterTopology = (AlterTopology) t;
+            long size = TypeSizes.sizeof(alterTopology.updates.size());
+            for (Map.Entry<NodeId, Location> entry : 
alterTopology.updates.entrySet())
+            {
+                size += NodeId.serializer.serializedSize(entry.getKey(), 
version);
+                size += Location.serializer.serializedSize(entry.getValue(), 
version);
+            }
+            return size;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index d7bcc25b71..1cc12f3882 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -95,6 +95,7 @@ public class NodeTool
     {
         List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList(
                 AbortBootstrap.class,
+                AlterTopology.class,
                 Assassinate.class,
                 CassHelp.class,
                 CIDRFilteringStats.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java 
b/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java
new file mode 100644
index 0000000000..e8078d3d57
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "altertopology", description = "Modify the datacenter and/or 
rack of one or more nodes")
+public class AlterTopology extends NodeToolCmd
+{
+    @Arguments(usage = "<node=dc:rack> [<node=dc:rack>...]", description = 
"One or more node identifiers, which may be either a node id, host id or 
broadcast address, each with a target dc:rack")
+    private List<String> args = new ArrayList<>();
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        checkArgument(!args.isEmpty(), "Invalid arguments; no changes 
specified");
+        try
+        {
+            probe.getStorageService().alterTopology(String.join(",", args));
+        }
+        catch (Exception e)
+        {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
new file mode 100644
index 0000000000..85c7040a58
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.Generators;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.sequences.SequencesUtils.ClearLockedRanges;
+import org.apache.cassandra.tcm.sequences.SequencesUtils.LockRanges;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import static java.time.Duration.ofSeconds;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+import static org.junit.Assert.assertEquals;
+
+public class AlterTopologyTest extends FuzzTestBase
+{
+    @Test
+    public void testTopologyChanges() throws Exception
+    {
+        Generator<SchemaSpec> schemaGen = 
SchemaGenerators.schemaSpecGen(KEYSPACE, "change_topology_test", 1000);
+        try (Cluster cluster = 
builder().withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+                                        .withRack("dc1", "rack1", 1)
+                                        .withRack("dc1", "rack2", 1)
+                                        .withRack("dc1", "rack3", 1)
+                                        .withRack("dc1", "rack4", 1)
+                                        .withConfig(config -> 
config.with(GOSSIP))
+                                        .withNodes(4)
+                                        .start())
+        {
+            IInvokableInstance cmsInstance = cluster.get(1);
+
+            withRandom(rng -> {
+                SchemaSpec schema = schemaGen.generate(rng);
+                Generators.TrackingGenerator<Integer> pkGen = 
Generators.tracking(Generators.int32(0, 
Math.min(schema.valueGenerators.pkPopulation(), 1000)));
+                Generator<Integer> ckGen = Generators.int32(0, 
Math.min(schema.valueGenerators.ckPopulation(), 1000));
+
+                HistoryBuilder history = new 
ReplayingHistoryBuilder(schema.valueGenerators,
+                                                                     (hb) -> 
InJvmDTestVisitExecutor.builder()
+                                                                               
                     .nodeSelector(i -> 1)
+                                                                               
                     .build(schema, hb, cluster));
+                history.custom(() -> {
+                    cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE +
+                                         " WITH replication = {'class': 
'NetworkTopologyStrategy', 'dc1' : 3 };");
+                    cluster.schemaChange(schema.compile());
+                    waitForCMSToQuiesce(cluster, cmsInstance);
+                }, "Setup");
+
+
+                Runnable writeAndValidate = () -> {
+                    for (int i = 0; i < 2000; i++)
+                        history.insert(pkGen.generate(rng), 
ckGen.generate(rng));
+
+                    for (int pk : pkGen.generated())
+                        history.selectPartition(pk);
+                };
+                writeAndValidate.run();
+
+                cluster.forEach(i -> i.runOnInstance(() -> {
+                    CustomTransformation.registerExtension(LockRanges.NAME, 
LockRanges.serializer);
+                    
CustomTransformation.registerExtension(ClearLockedRanges.NAME, 
ClearLockedRanges.serializer);
+                }));
+
+                // a dc change which affects placements is not allowed, so 
expect a rejection
+                history.custom(() -> {
+                    cmsInstance.runOnInstance(() -> {
+                        PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                        NodeId id = ClusterMetadata.current().myNodeId();
+                        Map<NodeId, Location> updates = new HashMap<>();
+                        updates.put(id, new Location("dcX", "rack1"));
+                        assertAlterTopologyRejection(pp, updates, "Proposed 
updates modify data placements");
+                    });
+                }, "DC change affecting placements");
+
+                // a rack change which affects placements is also not allowed
+                history.custom(() -> {
+                    cmsInstance.runOnInstance(() -> {
+                        PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                        NodeId id = ClusterMetadata.current().myNodeId();
+                        Map<NodeId, Location> updates = new HashMap<>();
+                        updates.put(id, new Location("dc1", "rack2"));
+                        assertAlterTopologyRejection(pp, updates, "Proposed 
updates modify data placements");
+                    });
+                },"Rack change affecting placements ");
+
+                // submit an update which would not modify placements so would 
normally be accepted
+                history.custom(() -> {
+                   cmsInstance.runOnInstance(() -> {
+                       PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                       NodeId id = ClusterMetadata.current().myNodeId();
+                       Map<NodeId, Location> updates = new HashMap<>();
+                       updates.put(id, new Location("dc1", "rack99"));
+                       // if there are locked ranges, implying in-progress 
range movements, any update is rejected
+                       ClusterMetadataService.instance().commit(new 
CustomTransformation(LockRanges.NAME, new LockRanges()));
+                       assertAlterTopologyRejection(pp, updates, "The 
requested topology changes cannot be executed while there are ongoing range 
movements");
+
+                       // but if no movements are in flight, the update is 
allowed
+                       ClusterMetadataService.instance().commit(new 
CustomTransformation(ClearLockedRanges.NAME, new ClearLockedRanges()));
+                       ClusterMetadataService.instance().commit(new 
AlterTopology(updates, pp));
+                       if 
(!ClusterMetadata.current().directory.location(id).rack.equals("rack99"))
+                           throw new AssertionError("Expected rack to have 
changed");
+                   });
+               }, "Rack change not affecting placements");
+
+               // changing multiple/all racks atomically
+               history.custom(() -> {
+                  cmsInstance.runOnInstance(() -> {
+                       PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                       Map<NodeId, Location> updates = new HashMap<>();
+                       Directory dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           updates.put(nodeId, new Location("dc1", "rack" + 
(nodeId.id() + 100)));
+
+                       ClusterMetadataService.instance().commit(new 
AlterTopology(updates, pp));
+                       dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           if 
(!ClusterMetadata.current().directory.location(nodeId).rack.equals("rack" + 
(nodeId.id() + 100)))
+                               throw new AssertionError("Expected rack to have 
changed");
+                  });
+               }, "Modify all racks not affecting placements");
+
+               // renaming a datacenter is supported, as long as it is not 
referenced in any replication params as that
+               // would impact placements
+               history.custom(() -> {
+                   cmsInstance.runOnInstance(() -> {
+                       PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                      Map<NodeId, Location> updates = new HashMap<>();
+                       Directory dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           updates.put(nodeId, new Location("renamed_dc", 
dir.location(nodeId).rack));
+                       assertAlterTopologyRejection(pp, updates, "Proposed 
updates modify data placements");
+                   });
+               }, "Renaming DC referenced in replication params");
+
+               // after modifying replication for the test keyspace, this 
should be allowed
+               history.custom(() -> {
+                   cmsInstance.runOnInstance(() -> {
+                       PlacementProvider pp = 
ClusterMetadataService.instance().placementProvider();
+                       Map<NodeId, Location> updates = new HashMap<>();
+                       QueryProcessor.executeInternal("ALTER KEYSPACE " + 
KEYSPACE +
+                                                      " WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor' : 3 };");
+                       Directory dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           updates.put(nodeId, new Location("renamed_dc", 
dir.location(nodeId).rack));
+
+                       ClusterMetadataService.instance().commit(new 
AlterTopology(updates, pp));
+
+                       for (NodeId nodeId : dir.peerIds())
+                           if 
(!ClusterMetadata.current().directory.location(nodeId).datacenter.equals("renamed_dc"))
+                               throw new AssertionError("Expected dc to have 
changed");
+
+                       // modify both datacenter and racks
+                       dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           updates.put(nodeId, new 
Location("renamed_dc_again", "rack" + (nodeId.id() + 200)));
+
+                       ClusterMetadataService.instance().commit(new 
AlterTopology(updates, pp));
+                       dir = ClusterMetadata.current().directory;
+                       for (NodeId nodeId : dir.peerIds())
+                           if 
(!ClusterMetadata.current().directory.location(nodeId).equals(new 
Location("renamed_dc_again", "rack" + (nodeId.id() + 200))))
+                               throw new AssertionError("Expected dc to have 
changed");
+                   });
+                   waitForCMSToQuiesce(cluster, cmsInstance);
+               },"Renaming DC not referenced in replication params");
+
+               // updates to system tables run asynchronously so spin until 
they're done
+               history.custom(() -> {
+                  cluster.forEach(i -> await(60).until(() -> 
i.callOnInstance(() -> {
+                      ClusterMetadata metadata = ClusterMetadata.current();
+                      NodeId myId = metadata.myNodeId();
+                      Directory dir = metadata.directory;
+                      for (NodeId nodeId : dir.peerIds())
+                      {
+                          String query = nodeId.equals(myId)
+                                         ? "select data_center, rack from 
system.local"
+                                         : String.format("select data_center, 
rack from system.peers_v2 where peer = '%s'",
+                                                         
dir.endpoint(nodeId).getHostAddress(false));
+                          UntypedResultSet res = 
QueryProcessor.executeInternal(query);
+                          if 
(!res.one().getString("data_center").equals("renamed_dc_again"))
+                              return false;
+                          if (!res.one().getString("rack").equals("rack" + 
(nodeId.id() + 200)))
+                              return false;
+                      }
+                      return true;
+                  })));
+               }, "Verify local system table updates");
+
+               // check gossip is also updated
+               history.custom(() -> {
+                  Map<String, Map<String, String>> gossipInfo = 
ClusterUtils.gossipInfo(cmsInstance);
+                  gossipInfo.forEach((ep, states) -> {
+                      String nodeId = states.get("HOST_ID").split(":")[1];
+                      String dc = states.get("DC").split(":")[1];
+                      assertEquals("renamed_dc_again", dc);
+                      String rack = states.get("RACK").split(":")[1];
+                      String expected = "rack" + 
(NodeId.fromString(nodeId).id() + 200);
+                      assertEquals(expected, rack);
+                  });
+                }, "Verify gossip state");
+
+               writeAndValidate.run();
+            });
+        }
+    }
+
+    private static void assertAlterTopologyRejection(PlacementProvider pp, 
Map<NodeId, Location> updates, String error)
+    {
+        ClusterMetadataService.instance()
+                              .commit(new AlterTopology(updates, pp),
+                                      m -> { throw new 
AssertionError("Expected rejection");},
+                                      (c, r) -> {
+                                          if (!(c == ExceptionCode.INVALID && 
r.startsWith(error)))
+                                              throw new 
AssertionError("Unexpected failure response: " + r);
+                                          return ClusterMetadata.current();
+                                      });
+
+    }
+
+    private static ConditionFactory await(int seconds)
+    {
+        return 
Awaitility.await().atMost(ofSeconds(seconds)).pollDelay(ofSeconds(1));
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
index 414f74bbef..44fdc7d701 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
@@ -935,10 +935,10 @@ public class MetadataChangeSimulationTest extends 
CMSTestBase
             while (!state.inFlightOperations.isEmpty())
             {
                 state = 
state.inFlightOperations.get(random.nextInt(state.inFlightOperations.size())).advance(state);
-                Assert.assertEquals(allSettled, 
sut.service.metadata().writePlacementAllSettled(ksm));
+                
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().writePlacementAllSettled(ksm)));
                 validatePlacements(sut, state);
             }
-            Assert.assertEquals(allSettled, 
sut.service.metadata().placements.get(ksm.params.replication));
+            
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().placements.get(ksm.params.replication)));
         }
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
index 0eb73b8694..9dea76040d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
@@ -343,6 +343,20 @@ public class ModelState
             return this;
         }
 
+        public Transformer withUpdatedRacks(Map<Node, String> updates)
+        {
+            assert currentNodes.containsAll(updates.keySet());
+            List<Node> newNodes = new ArrayList<>();
+            currentNodes.forEach(node -> {
+               if (updates.containsKey(node))
+                   newNodes.add(node.withNewRack(updates.get(node)));
+               else
+                   newNodes.add(node);
+            });
+            currentNodes = newNodes;
+            return this;
+        }
+
         public Transformer 
updateSimulation(PlacementSimulator.SimulatedPlacements simulatedPlacements)
         {
             this.simulatedPlacements = simulatedPlacements;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index 2869fe913a..0f95735e5a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -134,7 +134,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
                 assertEquals(2, metadata.fullCMSMembers().size());
                 ReplicationParams params = ReplicationParams.meta(metadata);
                 DataPlacement placements = metadata.placements.get(params);
-                assertEquals(placements.reads, placements.writes);
+                assertTrue(placements.reads.equivalentTo(placements.writes));
                 assertEquals(metadata.fullCMSMembers().size(), 
Integer.parseInt(params.asMap().get("dc0")));
             });
 
@@ -159,7 +159,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
                 
Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
                 assertEquals(3, metadata.fullCMSMembers().size());
                 DataPlacement placements = 
metadata.placements.get(ReplicationParams.meta(metadata));
-                assertEquals(placements.reads, placements.writes);
+                
Assert.assertTrue(placements.reads.equivalentTo(placements.writes));
             });
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
index 7763831933..c6fa2da54b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test.log;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,10 +41,13 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.MultiStepOperation;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
 import org.apache.cassandra.tcm.sequences.LeaveStreams;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
 import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
@@ -101,6 +105,23 @@ public abstract class SimulatedOperation
 
     }
 
+    public static ModelState changeRacks(CMSSut sut, ModelState state, 
Map<Node, String> updates)
+    {
+        ModelState.Transformer transformer = state.transformer()
+                                                  .withUpdatedRacks(updates)
+                                                  
.updateSimulation(state.simulatedPlacements);
+
+        Map<NodeId, Location> serviceUpdates = new HashMap<>();
+        for (Map.Entry<Node, String> entry : updates.entrySet())
+        {
+            Node n = entry.getKey();
+            String rack = entry.getValue();
+            serviceUpdates.put(n.nodeId(), new Location(n.dc(), rack));
+        }
+        sut.service.commit(new AlterTopology(serviceUpdates, 
sut.service.placementProvider()));
+        return transformer.transform();
+    }
+
     public static ModelState leave(CMSSut sut, ModelState state, Node node)
     {
         ModelState.Transformer transformer = state.transformer();
diff --git 
a/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java 
b/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
index 2c24ec257c..c9dcbcb137 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
@@ -725,6 +725,7 @@ public class TokenPlacementModel
         long token(int tokenIdx);
         Lookup forceToken(int tokenIdx, long token);
         void reset();
+        int rackIdx(String rack);
 
         default NodeId nodeId(int nodeIdx)
         {
@@ -785,6 +786,11 @@ public class TokenPlacementModel
                 return null;
             }
 
+            public int rackIdx(String rack)
+            {
+                throw new UnsupportedOperationException();
+            }
+
             public void reset()
             {
                 throw new UnsupportedOperationException();
@@ -843,6 +849,11 @@ public class TokenPlacementModel
         {
             return String.format("rack%d", rackIdx);
         }
+
+        public int rackIdx(String rack)
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     public static class HumanReadableTokensLookup extends DefaultLookup {
@@ -1022,6 +1033,12 @@ public class TokenPlacementModel
         {
             return new Node(tokenIdx, nodeIdx, dcIdx, rackIdx, 
lookup.forceToken(tokenIdx, override));
         }
+
+        public Node withNewRack(String newRack)
+        {
+            return new Node(tokenIdx, nodeIdx, dcIdx, lookup.rackIdx(newRack), 
lookup);
+        }
+
         public Murmur3Partitioner.LongToken longToken()
         {
             return new Murmur3Partitioner.LongToken(token());
diff --git 
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
 
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
index e6cdb1be82..d54b2e5e31 100644
--- 
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
+++ 
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
@@ -20,30 +20,21 @@ package org.apache.cassandra.schema;
 
 import org.junit.Test;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.sequences.LockedRanges;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.apache.cassandra.triggers.TriggersTest;
 
+import static 
org.apache.cassandra.tcm.sequences.SequencesUtils.ClearLockedRanges;
+import static org.apache.cassandra.tcm.sequences.SequencesUtils.LockRanges;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class SchemaChangeDuringRangeMovementTest extends CQLTester
 {
-    // at the moment, the detail of the specific LockedRanges doesn't matter, 
transformations
-    // which are rejected in the presence of locking are rejected whatever is 
actually locked
-    private static final LockedRanges.AffectedRanges toLock =
-        LockedRanges.AffectedRanges.singleton(ReplicationParams.simple(3),
-                                              new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                                                          
DatabaseDescriptor.getPartitioner().getRandomToken()));
-
     @Test
     public void testAlwaysPermittedChanges() throws Throwable
     {
@@ -216,39 +207,4 @@ public class SchemaChangeDuringRangeMovementTest extends 
CQLTester
         metadata = ClusterMetadataService.instance().commit(new 
ClearLockedRanges());
         assertTrue(metadata.lockedRanges.locked.isEmpty());
     }
-
-
-    // Custom transforms to lock/unlock an arbitrary set of ranges to
-    // avoid having to actually initiate some range movement
-    private static class LockRanges implements Transformation
-    {
-        @Override
-        public Kind kind()
-        {
-            return Kind.CUSTOM;
-        }
-
-        @Override
-        public Result execute(ClusterMetadata metadata)
-        {
-            LockedRanges newLocked = 
metadata.lockedRanges.lock(LockedRanges.keyFor(metadata.epoch), toLock);
-            return 
Transformation.success(metadata.transformer().with(newLocked), toLock);
-        }
-    }
-
-    private static class ClearLockedRanges implements Transformation
-    {
-        @Override
-        public Kind kind()
-        {
-            return Kind.CUSTOM;
-        }
-
-        @Override
-        public Result execute(ClusterMetadata metadata)
-        {
-            LockedRanges newLocked = LockedRanges.EMPTY;
-            return 
Transformation.success(metadata.transformer().with(newLocked), 
LockedRanges.AffectedRanges.EMPTY);
-        }
-    }
 }
diff --git 
a/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java 
b/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java
new file mode 100644
index 0000000000..59a7bb58d9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.MembershipUtils;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class AlterTopologyArgParsingTest
+{
+    Location loc = new Location("test_dc", "test_rack");
+    NodeId id = new NodeId(1);
+    Directory dir;
+
+    @Before
+    public void setup()
+    {
+        dir = new Directory();
+    }
+
+    @Test
+    public void testSingleChangeByInt()
+    {
+        String arg = "1=test_dc:test_rack";
+        Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+        assertEquals(1, parsed.size());
+        assertEquals(parsed.get(id), loc);
+    }
+
+    @Test
+    public void testSingleChangeByUUID()
+    {
+        String arg = String.format("%s=test_dc:test_rack", 
id.toUUID().toString());
+        Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+        assertEquals(1, parsed.size());
+        assertEquals(parsed.get(id), loc);
+    }
+
+    @Test
+    public void testSingleChangeByEndpoint()
+    {
+        InetAddressAndPort ep = MembershipUtils.endpoint(1);
+        dir = dir.with(new NodeAddresses(ep), loc); // this will associate 
NodeId(1) with ep
+        String arg = String.format("%s=test_dc:test_rack", 
ep.getHostAddressAndPort());
+        Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+        assertEquals(1, parsed.size());
+        assertEquals(parsed.get(id), loc);
+    }
+
+    @Test
+    public void testSingleChangeByEndpointAddress()
+    {
+        InetAddressAndPort ep = MembershipUtils.endpoint(1);
+        dir = dir.with(new NodeAddresses(ep), loc); // this will associate 
NodeId(1) with ep
+        String arg = String.format("%s=test_dc:test_rack", 
ep.getHostAddress(false));
+        Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+        assertEquals(1, parsed.size());
+        assertEquals(parsed.get(id), loc);
+    }
+
+    @Test
+    public void testInvalidArg()
+    {
+        String[] args = new String[]{ "invalid", "1=", "=dc:rack", "1=dc", 
"1=dc:" };
+        for (String invalid : args)
+        {
+            try
+            {
+                AlterTopology.parseArgs(invalid, dir);
+                fail("Expected exception");
+            }
+            catch (IllegalArgumentException e)
+            {
+            }
+        }
+    }
+
+    @Test
+    public void testMultipleChanges()
+    {
+        NodeId otherId = new NodeId(2);
+        InetAddressAndPort ep = MembershipUtils.endpoint(1);
+        dir = dir.with(new NodeAddresses(ep), loc); // this will associate 
NodeId(1) with ep
+        String arg = String.format("%s=dc1:rack1,%s=dc2:rack2,3=dc3:rack3,",
+                                   ep.getHostAddress(true),
+                                   otherId.toUUID().toString());
+        Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+        assertEquals(3, parsed.size());
+        assertEquals(parsed.get(id).datacenter, "dc1");
+        assertEquals(parsed.get(id).rack, "rack1");
+        assertEquals(parsed.get(otherId).datacenter, "dc2");
+        assertEquals(parsed.get(otherId).rack, "rack2");
+        assertEquals(parsed.get(new NodeId(3)).datacenter, "dc3");
+        assertEquals(parsed.get(new NodeId(3)).rack, "rack3");
+    }
+
+    @Test
+    public void testMultipleChangesForSameNode()
+    {
+        InetAddressAndPort ep = MembershipUtils.endpoint(1);
+        dir = dir.with(new NodeAddresses(ep), loc); // this will associate 
NodeId(1) with ep
+        String epString = ep.getHostAddress(true);
+        String idString = id.toUUID().toString();
+        assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2", 
id.id()));
+        assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2", 
id.id(), idString));
+        assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2", 
id.id(), epString));
+        assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2", 
epString));
+        assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2", 
idString));
+        
assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2,%s=dc3:rack3", 
id.id(), idString, epString));
+    }
+
+    private void assertIllegalArgument(String arg)
+    {
+       try
+       {
+           AlterTopology.parseArgs(arg, dir);
+           fail("Expected exception");
+       }
+       catch (IllegalArgumentException e) {}
+    }
+}
diff --git a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java 
b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
new file mode 100644
index 0000000000..ea66961ee5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DirectoryTest
+{
+
+    @Test
+    public void updateLocationTest()
+    {
+        Location DC1_R1 = new Location("datacenter1", "rack1");
+        Directory dir = new Directory();
+        assertTrue(dir.isEmpty());
+        assertTrue(dir.knownDatacenters().isEmpty());
+
+        NodeId missing = new NodeId(1000);
+        assertInvalidLocationUpdate(dir, missing, DC1_R1, "Node " + missing + 
" has no registered location to update");
+
+        // add a new node and retrieve its Location
+        NodeAddresses addresses = new NodeAddresses(endpoint(1));
+        dir = dir.with(addresses, DC1_R1);
+        NodeId node = dir.peerId(addresses.broadcastAddress);
+        assertEquals(DC1_R1, dir.location(node));
+        assertTrue(dir.knownDatacenters().contains("datacenter1"));
+
+        // endpoints by DC & rack are not updated immediately, this is an 
explicit step when a node joins
+        assertTrue(dir.allDatacenterEndpoints().isEmpty());
+        assertTrue(dir.allDatacenterRacks().isEmpty());
+
+        // when a node joins, its DC and rack become active
+        dir = dir.withRackAndDC(node);
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter1").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter1").get("rack1").contains(addresses.broadcastAddress));
+
+        // update rack
+        Location DC1_R2 = new Location("datacenter1", "rack2");
+        dir = dir.withUpdatedRackAndDc(node, DC1_R2);
+        assertEquals(DC1_R2, dir.location(node));
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter1").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter1").get("rack2").contains(addresses.broadcastAddress));
+        // previous rack is no longer present as it was made empty
+        
assertFalse(dir.allDatacenterRacks().get("datacenter1").containsKey("rack1"));
+
+        // update DC
+        Location DC2_R2 = new Location("datacenter2", "rack2");
+        dir = dir.withUpdatedRackAndDc(node, DC2_R2);
+        assertEquals(DC2_R2, dir.location(node));
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+        // datacenter1 is no longer present as it was made empty
+        assertFalse(dir.allDatacenterRacks().containsKey("datacenter1"));
+        assertFalse(dir.knownDatacenters().contains("datacenter1"));
+        assertTrue(dir.knownDatacenters().contains("datacenter2"));
+
+        // Add a second node in the same dc & rack
+        NodeAddresses otherAddresses = new NodeAddresses(endpoint(2));
+        dir = dir.with(otherAddresses, DC2_R2);
+        NodeId otherNode = dir.peerId(otherAddresses.broadcastAddress);
+        dir = dir.withRackAndDC(otherNode);
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(otherAddresses.broadcastAddress));
+
+        // now updating the rack of the first node should not remove rack2 
altogether as it not empty
+        Location DC2_R3 = new Location("datacenter2", "rack3");
+        dir = dir.withUpdatedRackAndDc(node, DC2_R3);
+        assertEquals(DC2_R3, dir.location(node));
+        // updated node is removed from rack2 and added to rack3
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(addresses.broadcastAddress));
+        
assertFalse(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+        // other node is still present in rack2
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(otherAddresses.broadcastAddress));
+        
assertFalse(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(otherAddresses.broadcastAddress));
+
+        // simulate what happens when the nodes leave the cluster
+        dir = dir.withoutRackAndDC(otherNode);
+        
assertFalse(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+        
assertFalse(dir.allDatacenterRacks().get("datacenter2").containsKey("rack2"));
+        
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+        
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(addresses.broadcastAddress));
+
+        dir = dir.withoutRackAndDC(node);
+        assertTrue(dir.allDatacenterEndpoints().isEmpty());
+        assertTrue(dir.allDatacenterRacks().isEmpty());
+    }
+
+    private void assertInvalidLocationUpdate(Directory dir, NodeId nodeId, 
Location loc, String message)
+    {
+        try
+        {
+            dir.withUpdatedRackAndDc(nodeId, loc);
+            fail("Expected an exception");
+        }
+        catch (IllegalArgumentException e)
+        {
+            assertTrue(e.getMessage().equals(message));
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
 
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
index 1c1a44296b..100979104f 100644
--- 
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
+++ 
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
@@ -34,7 +34,6 @@ import 
org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -44,10 +43,8 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.PlacementDeltas;
-import org.apache.cassandra.tcm.ownership.ReplicaGroups;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.PrepareReplace;
@@ -304,9 +301,9 @@ public class InProgressSequenceCancellationTest
 
     private void assertRelevantMetadata(ClusterMetadata first, ClusterMetadata 
second)
     {
-        assertPlacementsEquivalent(first.placements, second.placements);
-        assertTrue(first.directory.isEquivalent(second.directory));
-        assertTrue(first.tokenMap.isEquivalent(second.tokenMap));
+        assertTrue(first.placements.equivalentTo(second.placements));
+        assertTrue(first.directory.equivalentTo(second.directory));
+        assertTrue(first.tokenMap.equivalentTo(second.tokenMap));
         assertEquals(first.lockedRanges.locked.keySet(), 
second.lockedRanges.locked.keySet());
     }
 
@@ -314,31 +311,4 @@ public class InProgressSequenceCancellationTest
     {
         return new ClusterMetadata(Murmur3Partitioner.instance, directory);
     }
-
-    private void assertPlacementsEquivalent(DataPlacements first, 
DataPlacements second)
-    {
-        assertEquals(first.keys(), second.keys());
-
-        first.asMap().forEach((params, placement) -> {
-            DataPlacement otherPlacement = second.get(params);
-            ReplicaGroups r1 = placement.reads;
-            ReplicaGroups r2 = otherPlacement.reads;
-            assertEquals(r1.ranges, r2.ranges);
-            r1.forEach((range, e1) -> {
-                EndpointsForRange e2 = r2.forRange(range).get();
-                assertEquals(e1.size(),e2.size());
-                assertTrue(e1.get().stream().allMatch(e2::contains));
-            });
-
-            ReplicaGroups w1 = placement.reads;
-            ReplicaGroups w2 = otherPlacement.reads;
-            assertEquals(w1.ranges, w2.ranges);
-            w1.forEach((range, e1) -> {
-                EndpointsForRange e2 = w2.forRange(range).get();
-                assertEquals(e1.size(),e2.size());
-                assertTrue(e1.get().stream().allMatch(e2::contains));
-            });
-
-        });
-    }
 }
diff --git a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java 
b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
index eb310cc78b..44b3b59ce3 100644
--- a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
+++ b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
@@ -18,19 +18,27 @@
 
 package org.apache.cassandra.tcm.sequences;
 
+import java.io.Serializable;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.PlacementDeltas;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.PrepareMove;
@@ -174,4 +182,68 @@ public class SequencesUtils
     {
         return Epoch.create(epoch);
     }
+
+    // Custom transforms to lock/unlock an arbitrary set of ranges to
+    // avoid having to actually initiate some range movement
+    public static class LockRanges implements Transformation, Serializable
+    {
+        public static final AsymmetricMetadataSerializer<Transformation, 
LockRanges> serializer = new AsymmetricMetadataSerializer<Transformation, 
LockRanges>()
+        {
+            @Override
+            public void serialize(Transformation t, DataOutputPlus out, 
Version version){}
+            @Override
+            public LockRanges deserialize(DataInputPlus in, Version version) 
{return new LockRanges();}
+            @Override
+            public long serializedSize(Transformation t, Version version) 
{return 0;}
+        };
+
+        public static final String NAME = "TestLockRanges";
+
+        // at the moment, the detail of the specific LockedRanges doesn't 
matter, transformations
+        // which are rejected in the presence of locking are rejected whatever 
is actually locked
+        private static final LockedRanges.AffectedRanges toLock =
+            LockedRanges.AffectedRanges.singleton(ReplicationParams.simple(3),
+                                                  new 
Range<>(Murmur3Partitioner.instance.getMinimumToken(),
+                                                              
Murmur3Partitioner.instance.getRandomToken()));
+
+        @Override
+        public Kind kind()
+        {
+            return Kind.CUSTOM;
+        }
+
+        @Override
+        public Result execute(ClusterMetadata metadata)
+        {
+            LockedRanges newLocked = 
metadata.lockedRanges.lock(LockedRanges.keyFor(metadata.epoch), toLock);
+            return 
Transformation.success(metadata.transformer().with(newLocked), toLock);
+        }
+    }
+
+    public static class ClearLockedRanges implements Transformation, 
Serializable
+    {
+        public static final AsymmetricMetadataSerializer<Transformation, 
ClearLockedRanges> serializer = new 
AsymmetricMetadataSerializer<Transformation, ClearLockedRanges>()
+        {
+            @Override
+            public void serialize(Transformation t, DataOutputPlus out, 
Version version) {}
+            @Override
+            public ClearLockedRanges deserialize(DataInputPlus in, Version 
version) {return new ClearLockedRanges();}
+            @Override
+            public long serializedSize(Transformation t, Version version) 
{return 0;}
+        };
+        public static final String NAME = "TestClearLockedRanges";
+
+        @Override
+        public Kind kind()
+        {
+            return Kind.CUSTOM;
+        }
+
+        @Override
+        public Result execute(ClusterMetadata metadata)
+        {
+            LockedRanges newLocked = LockedRanges.EMPTY;
+            return 
Transformation.success(metadata.transformer().with(newLocked), 
LockedRanges.AffectedRanges.EMPTY);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to