This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 80b97e27e715c1c7255f7a0e5c262d24f47a27f6
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Wed Mar 1 12:53:19 2023 +0000

    [CEP-21] Produce placements equivalent to current replication strategies
    
    Minimal modifications to AbstractReplicationStrategy implementations to
    support the production of DataPlacements using ClusterMetadata while
    retaining calculateNaturalReplicas. Also adds tests to compare the
    output of both methods and assert their equivalence. Eventually, the
    original implementations based on TokenMetadata will be retired and
    will be retained in the test source to guard against regressions.
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../locator/AbstractReplicaCollection.java         |  40 ++-
 .../locator/AbstractReplicationStrategy.java       |  10 +-
 .../apache/cassandra/locator/LocalStrategy.java    |  50 +--
 .../org/apache/cassandra/locator/MetaStrategy.java |  23 +-
 .../cassandra/locator/NetworkTopologyStrategy.java |  70 ++--
 .../apache/cassandra/locator/SimpleStrategy.java   |  41 ++-
 .../apache/cassandra/locator/TokenMetadata.java    | 159 +++++----
 .../apache/cassandra/service/StorageService.java   |  11 +-
 .../service/reads/range/ReplicaPlanIterator.java   |   3 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |  54 ++-
 .../org/apache/cassandra/tcm/MetadataKeys.java     |   5 +-
 .../AsEndpoints.java}                              |  21 +-
 .../AsLocations.java}                              |  27 +-
 .../AsTokenMap.java}                               |  24 +-
 .../cassandra/tcm/compatibility/GossipHelper.java  |   2 +
 .../tcm/compatibility/TokenRingUtils.java          | 150 ++++++++
 .../apache/cassandra/tcm/membership/Directory.java |   4 +-
 .../tcm/ownership/PrimaryRangeComparator.java      |  51 +++
 .../apache/cassandra/tcm/ownership/TokenMap.java   | 302 ++++++++++++++++
 .../tcm/ownership/UniformRangePlacement.java       | 324 +++++++++++++++++
 .../tcm/transformations/cms/EntireRange.java       |   8 +-
 .../org/apache/cassandra/utils/BiMultiValMap.java  |  16 +
 .../locator/NetworkTopologyStrategyTest.java       |   3 +-
 .../cassandra/locator/TokenMetadataTest.java       |   3 +-
 .../cassandra/service/LeaveAndBootstrapTest.java   |   3 +-
 .../org/apache/cassandra/service/MoveTest.java     |   3 +-
 .../cassandra/tcm/membership/MembershipUtils.java  |  10 +
 .../cassandra/tcm/ownership/OwnershipUtils.java    | 139 ++++++++
 .../tcm/ownership/PlacementDeltasTest.java         | 201 +++++++++++
 .../UniformRangePlacementEquivalenceTest.java      | 388 +++++++++++++++++++++
 .../tcm/ownership/UniformRangePlacementTest.java   | 290 +++++++++++++++
 31 files changed, 2233 insertions(+), 202 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index cc210622d1..fa8d17facd 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -22,6 +22,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 import java.util.AbstractList;
 import java.util.AbstractMap;
@@ -100,6 +101,18 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
             return contents[begin + index];
         }
 
+        public ReplicaList map(Function<Replica, Replica> map)
+        {
+            Replica[] contents = new Replica[size];
+            for (int i = 0; i < contents.length; i++)
+            {
+                if (this.contents[i] != null)
+                    contents[i] = map.apply(this.contents[i]);
+            }
+
+            return new ReplicaList(contents, begin, contents.length);
+        }
+
         public void add(Replica replica)
         {
             // can only add to full array - if we have sliced it, we must be a 
snapshot
@@ -591,33 +604,30 @@ public abstract class AbstractReplicaCollection<C extends 
AbstractReplicaCollect
 
     /**
      *  <p>
-     *  It's not clear whether {@link AbstractReplicaCollection} should 
implement the order sensitive {@link Object#equals(Object) equals}
-     *  of {@link java.util.List} or the order oblivious {@link 
Object#equals(Object) equals} of {@link java.util.Set}. We never rely on 
equality
-     *  in the database so rather then leave in a potentially surprising 
implementation we have it throw {@link UnsupportedOperationException}.
-     *  </p>
-     *  <p>
-     *  Don't implement this and pick one behavior over the other. If you want 
equality you can static import {@link 
com.google.common.collect.Iterables#elementsEqual(Iterable, Iterable)}
-     *  and use that to get order sensitive equals.
+     *  Implements order sensitive {@link Object#equals(Object)} #equals() 
equals} of {@link java.util.List}.
      *  </p>
      */
     public final boolean equals(Object o)
     {
-        throw new UnsupportedOperationException("AbstractReplicaCollection 
equals unsupported");
+        if (!(o instanceof AbstractReplicaCollection))
+            return false;
+
+        return Iterators.elementsEqual(iterator(), 
((AbstractReplicaCollection) o).iterator());
     }
 
     /**
      *  <p>
-     *  It's not clear whether {@link AbstractReplicaCollection} should 
implement the order sensitive {@link Object#hashCode() hashCode}
-     *  of {@link java.util.List} or the order oblivious {@link 
Object#hashCode() equals} of {@link java.util.Set}. We never rely on hashCode
-     *  in the database so rather then leave in a potentially surprising 
implementation we have it throw {@link UnsupportedOperationException}.
-     *  </p>
-     *  <p>
-     *  Don't implement this and pick one behavior over the other.
+     *  Implements order sensitive {@link Object#hashCode() hashCode} of 
{@link java.util.List}.
      *  </p>
      */
     public final int hashCode()
     {
-        throw new UnsupportedOperationException("AbstractReplicaCollection 
hashCode unsupported");
+        int result = 1;
+
+        for (Replica e : this)
+            result = 31 * result + (e == null ? 0 : e.hashCode());
+
+        return result;
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java 
b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index b3b4ae6b67..d5a54971ee 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -47,6 +48,9 @@ import 
org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterWriteResponseHandler;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -94,13 +98,13 @@ public abstract class AbstractReplicationStrategy
     {
         Token searchToken = searchPosition.getToken();
         long currentRingVersion = tokenMetadata.getRingVersion();
-        Token keyToken = 
TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
+        Token keyToken = 
TokenRingUtils.firstToken(tokenMetadata.sortedTokens(), searchToken);
         EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, 
keyToken);
         if (endpoints == null)
         {
             TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
             // if our cache got invalidated, it's possible there is a new 
token to account for too
-            keyToken = TokenMetadata.firstToken(tm.sortedTokens(), 
searchToken);
+            keyToken = TokenRingUtils.firstToken(tm.sortedTokens(), 
searchToken);
             endpoints = calculateNaturalReplicas(searchToken, tm);
             replicas.put(tm.getRingVersion(), keyToken, endpoints);
         }
@@ -133,6 +137,8 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract EndpointsForRange calculateNaturalReplicas(Token 
searchToken, TokenMetadata tokenMetadata);
 
+    public abstract DataPlacement calculateDataPlacement(List<Range<Token>> 
ranges, ClusterMetadata metadata);
+
     public <T> AbstractWriteResponseHandler<T> 
getWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan,
                                                                        
Runnable callback,
                                                                        
WriteType writeType,
diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java 
b/src/java/org/apache/cassandra/locator/LocalStrategy.java
index 0e3a9185fe..3f41ef806f 100644
--- a/src/java/org/apache/cassandra/locator/LocalStrategy.java
+++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java
@@ -17,31 +17,46 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.Collections;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.transformations.cms.EntireRange;
 
-public class LocalStrategy extends AbstractReplicationStrategy
+public class LocalStrategy extends SystemStrategy
 {
     private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1);
-    private final EndpointsForRange replicas;
+//    private final EndpointsForRange replicas;
 
     public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, 
IEndpointSnitch snitch, Map<String, String> configOptions)
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
-        replicas = EndpointsForRange.of(
-                new Replica(FBUtilities.getBroadcastAddressAndPort(),
-                        DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                        DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                        true
-                )
-        );
+//        replicas = EndpointsForRange.of(
+//                new Replica(FBUtilities.getBroadcastAddressAndPort(),
+//                        
DatabaseDescriptor.getPartitioner().getMinimumToken(),
+//                        
DatabaseDescriptor.getPartitioner().getMinimumToken(),
+//                        true
+//                )
+//        );
+    }
+
+    @Override
+    public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, 
ClusterMetadata metadata)
+    {
+        return EntireRange.placement;
+    }
+
+    @Override
+    public ReplicationFactor getReplicationFactor()
+    {
+        return RF;
     }
 
     /**
@@ -52,17 +67,12 @@ public class LocalStrategy extends 
AbstractReplicationStrategy
     @Override
     public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition)
     {
-        return replicas;
+        return EntireRange.localReplicas;
     }
 
     public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata metadata)
     {
-        return replicas;
-    }
-
-    public ReplicationFactor getReplicationFactor()
-    {
-        return RF;
+        return EntireRange.localReplicas;
     }
 
     public void validateOptions() throws ConfigurationException
diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java 
b/src/java/org/apache/cassandra/locator/MetaStrategy.java
index a62b0ea2c7..295e06f1a9 100644
--- a/src/java/org/apache/cassandra/locator/MetaStrategy.java
+++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java
@@ -17,10 +17,17 @@
  */
 package org.apache.cassandra.locator;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.PlacementForRange;
+import org.apache.cassandra.tcm.transformations.cms.EntireRange;
 
 public class MetaStrategy extends SystemStrategy
 {
@@ -32,9 +39,23 @@ public class MetaStrategy extends SystemStrategy
     @Override
     public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata tokenMetadata)
     {
-        return ClusterMetadata.current().cmsReplicas;
+        return replicas();
     }
 
+    @Override
+    public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, 
ClusterMetadata metadata)
+    {
+        PlacementForRange placement = 
PlacementForRange.builder(1).withReplicaGroup(replicas()).build();
+        return new DataPlacement(placement, placement);
+    }
+
+    private static EndpointsForRange replicas()
+    {
+        Set<InetAddressAndPort> members = 
ClusterMetadata.current().cmsMembers();
+        return EndpointsForRange.builder(EntireRange.entireRange, 
members.size())
+                                
.addAll(members.stream().map(EntireRange::replica).collect(Collectors.toList()))
+                                .build();
+    }
     @Override
     public ReplicationFactor getReplicationFactor()
     {
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java 
b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 5e6a2f8ec9..49be984c44 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -30,13 +30,20 @@ import org.apache.cassandra.dht.Datacenters;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.AsEndpoints;
+import org.apache.cassandra.tcm.compatibility.AsLocations;
+import org.apache.cassandra.tcm.compatibility.AsTokenMap;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.PlacementForRange;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
@@ -106,7 +113,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
          * For efficiency the set is shared between the instances, using the 
location pair (dc, rack) to make sure
          * clashing names aren't a problem.
          */
-        Set<Pair<String, String>> racks;
+        Set<Location> racks;
 
         /** Number of replicas left to fill from this DC. */
         int rfLeft;
@@ -117,7 +124,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
                             int rackCount,
                             int nodeCount,
                             EndpointsForRange.Builder replicas,
-                            Set<Pair<String, String>> racks)
+                            Set<Location> racks)
         {
             this.replicas = replicas;
             this.racks = racks;
@@ -137,7 +144,7 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
          * Attempts to add an endpoint to the replicas for this datacenter, 
adding to the replicas set if successful.
          * Returns true if the endpoint was added, and this datacenter does 
not require further replicas.
          */
-        boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, 
Pair<String,String> location, Range<Token> replicatedRange)
+        boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Location 
location, Range<Token> replicatedRange)
         {
             if (done())
                 return false;
@@ -181,19 +188,23 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
     {
         // we want to preserve insertion order so that the first added 
endpoint becomes primary
         ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens();
-        Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken);
-        Token replicaStart = tokenMetadata.getPredecessor(replicaEnd);
+        Token replicaEnd = TokenRingUtils.firstToken(sortedTokens, 
searchToken);
+        Token replicaStart = TokenRingUtils.getPredecessor(sortedTokens, 
replicaEnd);
         Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd);
+        return calculateNaturalReplicas(searchToken, replicatedRange, 
tokenMetadata, tokenMetadata, tokenMetadata, datacenters);
+    }
 
+    private EndpointsForRange calculateNaturalReplicas(Token searchToken,
+                                                       Range<Token> 
replicatedRange,
+                                                       AsEndpoints endpoints,
+                                                       AsLocations locations,
+                                                       AsTokenMap tokens,
+                                                       Map<String, 
ReplicationFactor> datacenters)
+    {
         EndpointsForRange.Builder builder = new 
EndpointsForRange.Builder(replicatedRange);
-        Set<Pair<String, String>> seenRacks = new HashSet<>();
+        Set<Location> seenRacks = new HashSet<>();
 
-        Topology topology = tokenMetadata.getTopology();
-        // all endpoints in each DC, so we can check when we have exhausted 
all the members of a DC
-        Multimap<String, InetAddressAndPort> allEndpoints = 
topology.getDatacenterEndpoints();
-        // all racks in a DC so we can check when we have exhausted all racks 
in a DC
-        Map<String, ImmutableMultimap<String, InetAddressAndPort>> racks = 
topology.getDatacenterRacks();
-        assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any 
cluster members";
+        assert !locations.allDatacenterEndpoints().isEmpty() && 
!locations.allDatacenterRacks().isEmpty() : "not aware of any cluster members";
 
         int dcsToFill = 0;
         Map<String, DatacenterEndpoints> dcs = new 
HashMap<>(datacenters.size() * 2);
@@ -203,29 +214,48 @@ public class NetworkTopologyStrategy extends 
AbstractReplicationStrategy
         {
             String dc = en.getKey();
             ReplicationFactor rf = en.getValue();
-            int nodeCount = sizeOrZero(allEndpoints.get(dc));
+            int nodeCount = sizeOrZero(locations.datacenterEndpoints(dc));
 
             if (rf.allReplicas <= 0 || nodeCount <= 0)
                 continue;
 
-            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, 
sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks);
+            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, 
sizeOrZero(locations.datacenterRacks(dc)), nodeCount, builder, seenRacks);
             dcs.put(dc, dcEndpoints);
             ++dcsToFill;
         }
 
-        Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, 
searchToken, false);
+        Iterator<Token> tokenIter = 
TokenRingUtils.ringIterator(tokens.tokens(), searchToken, false);
         while (dcsToFill > 0 && tokenIter.hasNext())
         {
             Token next = tokenIter.next();
-            InetAddressAndPort ep = tokenMetadata.getEndpoint(next);
-            Pair<String, String> location = topology.getLocation(ep);
-            DatacenterEndpoints dcEndpoints = dcs.get(location.left);
+            NodeId owner = tokens.owner(next);
+            InetAddressAndPort ep = endpoints.endpoint(owner);
+            Location location = locations.location(owner);
+            DatacenterEndpoints dcEndpoints = dcs.get(location.datacenter);
             if (dcEndpoints != null && 
dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange))
                 --dcsToFill;
         }
         return builder.build();
     }
 
+    @Override
+    public DataPlacement calculateDataPlacement(List<Range<Token>> ranges,
+                                                ClusterMetadata metadata)
+    {
+        PlacementForRange.Builder builder = PlacementForRange.builder();
+        for (Range<Token> range : ranges)
+        {
+            builder.withReplicaGroup(calculateNaturalReplicas(range.right,
+                                                              range,
+                                                              
metadata.directory,  // AsEndpoints
+                                                              
metadata.directory,  // AsLocations
+                                                              
metadata.tokenMap,
+                                                              datacenters));
+        }
+        PlacementForRange built = builder.build();
+        return new DataPlacement(built, built);
+    }
+
     private int sizeOrZero(Multimap<?, ?> collection)
     {
         return collection != null ? collection.asMap().size() : 0;
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java 
b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 488b601ce7..1384fa8f9c 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.slf4j.Logger;
@@ -35,6 +36,13 @@ import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.AsEndpoints;
+import org.apache.cassandra.tcm.compatibility.AsTokenMap;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.PlacementForRange;
 
 
 /**
@@ -56,17 +64,37 @@ public class SimpleStrategy extends 
AbstractReplicationStrategy
         this.rf = 
ReplicationFactor.fromString(this.configOptions.get(REPLICATION_FACTOR));
     }
 
+    @Override
+    public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, 
ClusterMetadata metadata)
+    {
+        PlacementForRange.Builder builder = PlacementForRange.builder();
+        for (Range<Token> range : ranges)
+            builder.withReplicaGroup(calculateNaturalReplicas(range.right, 
metadata.tokenMap.tokens(), range, metadata.directory, metadata.tokenMap));
+
+        PlacementForRange built = builder.build();
+        return new DataPlacement(built, built);
+    }
+
     @Override
     public EndpointsForRange calculateNaturalReplicas(Token token, 
TokenMetadata metadata)
     {
         ArrayList<Token> ring = metadata.sortedTokens();
+        Token replicaEnd = TokenRingUtils.firstToken(ring, token);
+        Token replicaStart = TokenRingUtils.getPredecessor(ring, replicaEnd);
+        Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd);
+        return calculateNaturalReplicas(token, ring, replicaRange, metadata, 
metadata);
+    }
+
+    private EndpointsForRange calculateNaturalReplicas(Token token,
+                                                       List<Token> ring,
+                                                       Range<Token> 
replicaRange,
+                                                       AsEndpoints endpoints,
+                                                       AsTokenMap tokens)
+    {
         if (ring.isEmpty())
-            return EndpointsForRange.empty(new 
Range<>(metadata.partitioner.getMinimumToken(), 
metadata.partitioner.getMinimumToken()));
+            return EndpointsForRange.empty(new 
Range<>(tokens.partitioner().getMinimumToken(), 
token.getPartitioner().getMinimumToken()));
 
-        Token replicaEnd = TokenMetadata.firstToken(ring, token);
-        Token replicaStart = metadata.getPredecessor(replicaEnd);
-        Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd);
-        Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false);
+        Iterator<Token> iter = TokenRingUtils.ringIterator(ring, token, false);
 
         EndpointsForRange.Builder replicas = new 
EndpointsForRange.Builder(replicaRange, rf.allReplicas);
 
@@ -74,7 +102,8 @@ public class SimpleStrategy extends 
AbstractReplicationStrategy
         while (replicas.size() < rf.allReplicas && iter.hasNext())
         {
             Token tk = iter.next();
-            InetAddressAndPort ep = metadata.getEndpoint(tk);
+            NodeId owner = tokens.owner(tk);
+            InetAddressAndPort ep = endpoints.endpoint(owner);
             if (!replicas.endpoints().contains(ep))
                 replicas.add(new Replica(ep, replicaRange, replicas.size() < 
rf.fullReplicas));
         }
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java 
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 6b0ea7d810..abbc76d482 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -45,6 +45,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.compatibility.AsEndpoints;
+import org.apache.cassandra.tcm.compatibility.AsLocations;
+import org.apache.cassandra.tcm.compatibility.AsTokenMap;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.utils.BiMultiValMap;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SortedBiMultiValMap;
@@ -52,7 +58,7 @@ import org.apache.cassandra.utils.SortedBiMultiValMap;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
-public class TokenMetadata
+public class TokenMetadata implements AsEndpoints, AsLocations, AsTokenMap
 {
     private static final Logger logger = 
LoggerFactory.getLogger(TokenMetadata.class);
 
@@ -786,10 +792,7 @@ public class TokenMetadata
 
     public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> 
tokens)
     {
-        Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
-        for (Token right : tokens)
-            ranges.add(new Range<>(getPredecessor(right), right));
-        return ranges;
+        return TokenRingUtils.getPrimaryRangesFor(sortedTokens(), tokens);
     }
 
     @Deprecated
@@ -1041,22 +1044,6 @@ public class TokenMetadata
         return newPendingRanges;
     }
 
-    public Token getPredecessor(Token token)
-    {
-        List<Token> tokens = sortedTokens();
-        int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + 
tokenToEndpointMapKeysAsStrings();
-        return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 
1);
-    }
-
-    public Token getSuccessor(Token token)
-    {
-        List<Token> tokens = sortedTokens();
-        int index = Collections.binarySearch(tokens, token);
-        assert index >= 0 : token + " not found in " + 
tokenToEndpointMapKeysAsStrings();
-        return (index == (tokens.size() - 1)) ? tokens.get(0) : 
tokens.get(index + 1);
-    }
-
     private String tokenToEndpointMapKeysAsStrings()
     {
         lock.readLock().lock();
@@ -1174,66 +1161,6 @@ public class TokenMetadata
         }
     }
 
-    public static int firstTokenIndex(final ArrayList<Token> ring, Token 
start, boolean insertMin)
-    {
-        assert ring.size() > 0;
-        // insert the minimum token (at index == -1) if we were asked to 
include it and it isn't a member of the ring
-        int i = Collections.binarySearch(ring, start);
-        if (i < 0)
-        {
-            i = (i + 1) * (-1);
-            if (i >= ring.size())
-                i = insertMin ? -1 : 0;
-        }
-        return i;
-    }
-
-    public static Token firstToken(final ArrayList<Token> ring, Token start)
-    {
-        return ring.get(firstTokenIndex(ring, start, false));
-    }
-
-    /**
-     * iterator over the Tokens in the given ring, starting with the token for 
the node owning start
-     * (which does not have to be a Token in the ring)
-     * @param includeMin True if the minimum token should be returned in the 
ring even if it has no owner.
-     */
-    public static Iterator<Token> ringIterator(final ArrayList<Token> ring, 
Token start, boolean includeMin)
-    {
-        if (ring.isEmpty())
-            return includeMin ? 
Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
-                              : Collections.emptyIterator();
-
-        final boolean insertMin = includeMin && !ring.get(0).isMinimum();
-        final int startIndex = firstTokenIndex(ring, start, insertMin);
-        return new AbstractIterator<Token>()
-        {
-            int j = startIndex;
-            protected Token computeNext()
-            {
-                if (j < -1)
-                    return endOfData();
-                try
-                {
-                    // return minimum for index == -1
-                    if (j == -1)
-                        return start.getPartitioner().getMinimumToken();
-                    // return ring token for other indexes
-                    return ring.get(j);
-                }
-                finally
-                {
-                    j++;
-                    if (j == ring.size())
-                        j = insertMin ? -1 : 0;
-                    if (j == startIndex)
-                        // end iteration
-                        j = -2;
-                }
-            }
-        };
-    }
-
     /** used by tests */
     public void clearUnsafe()
     {
@@ -1626,4 +1553,74 @@ public class TokenMetadata
             }
         }
     }
+
+    // Methods of org.apache.cassandra.tcm.compatibility.AsEndpoints
+    @Override
+    public NodeId peerId(InetAddressAndPort endpoint)
+    {
+        return new NodeId(getHostId(endpoint));
+    }
+
+    @Override
+    public InetAddressAndPort endpoint(NodeId id)
+    {
+        return getEndpointForHostId(id.uuid);
+    }
+
+    // Methods of org.apache.cassandra.tcm.compatibility.AsLocations
+    @Override
+    public Location location(NodeId peer)
+    {
+        Pair<String, String> dcRack = topology.getLocation(endpoint(peer));
+        return new Location(dcRack.left, dcRack.right);
+    }
+
+    @Override
+    public Set<InetAddressAndPort> datacenterEndpoints(String datacenter)
+    {
+        return new HashSet<>(topology.dcEndpoints.get(datacenter));
+    }
+
+    @Override
+    public Multimap<String, InetAddressAndPort> allDatacenterEndpoints()
+    {
+        return topology.getDatacenterEndpoints();
+    }
+
+    @Override
+    public Multimap<String, InetAddressAndPort> datacenterRacks(String 
datacenter)
+    {
+        return topology.dcRacks.get(datacenter);
+    }
+
+    @Override
+    public Map<String, Multimap<String, InetAddressAndPort>> 
allDatacenterRacks()
+    {
+        // terrible, but temporary
+        Map<String, Multimap<String, InetAddressAndPort>> dcRacks = new 
HashMap<>();
+        topology.getDatacenterRacks().forEach((dc, racks) -> {
+            dcRacks.put(dc, HashMultimap.create(racks));
+        });
+        return dcRacks;
+    }
+
+    // Methods of org.apache.cassandra.tcm.compatibility.AsTokenMap
+    @Override
+    public IPartitioner partitioner()
+    {
+        return partitioner;
+    }
+
+    @Override
+    public ImmutableList<Token> tokens()
+    {
+        return ImmutableList.copyOf(sortedTokens);
+    }
+
+    @Override
+    public NodeId owner(Token token)
+    {
+        return new NodeId(getHostId(getEndpoint(token)));
+    }
+
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 75557a4489..b4fdcd1d99 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -208,6 +208,7 @@ import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Startup;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.transport.ClientResourceLimits;
@@ -4830,13 +4831,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
         Collection<Range<Token>> primaryRanges = new HashSet<>();
         TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap();
-        for (Token token : metadata.sortedTokens())
+        List<Token> sortedTokens = metadata.sortedTokens();
+        for (Token token : sortedTokens)
         {
             EndpointsForRange replicas = 
strategy.calculateNaturalReplicas(token, metadata);
             if (replicas.size() > 0 && replicas.get(0).endpoint().equals(ep))
             {
                 Preconditions.checkState(replicas.get(0).isFull());
-                primaryRanges.add(new Range<>(metadata.getPredecessor(token), 
token));
+                primaryRanges.add(new 
Range<>(TokenRingUtils.getPredecessor(sortedTokens, token), token));
             }
         }
         return primaryRanges;
@@ -4858,7 +4860,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         AbstractReplicationStrategy strategy = 
Keyspace.open(keyspace).getReplicationStrategy();
 
         Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>();
-        for (Token token : metadata.sortedTokens())
+        List<Token> sortedTokens = metadata.sortedTokens();
+        for (Token token : sortedTokens)
         {
             EndpointsForRange replicas = 
strategy.calculateNaturalReplicas(token, metadata);
             for (Replica replica : replicas)
@@ -4867,7 +4870,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 {
                     if (replica.endpoint().equals(referenceEndpoint))
                     {
-                        localDCPrimaryRanges.add(new 
Range<>(metadata.getPredecessor(token), token));
+                        localDCPrimaryRanges.add(new 
Range<>(TokenRingUtils.getPredecessor(sortedTokens, token), token));
                     }
                     break;
                 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java 
b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
index ef88c9dffd..605c4561b9 100644
--- a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
+++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.Pair;
 
@@ -92,7 +93,7 @@ class ReplicaPlanIterator extends 
AbstractIterator<ReplicaPlan.ForRangeRead>
 
         List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>();
         // divide the queryRange into pieces delimited by the ring and minimum 
tokens
-        Iterator<Token> ringIter = 
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), 
queryRange.left.getToken(), true);
+        Iterator<Token> ringIter = 
TokenRingUtils.ringIterator(tokenMetadata.sortedTokens(), 
queryRange.left.getToken(), true);
         AbstractBounds<PartitionPosition> remainder = queryRange;
         while (ringIter.hasNext())
         {
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 90d5343d2c..bbccd3af2b 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tcm;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -26,11 +27,13 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.EndpointsForRange;
@@ -44,6 +47,7 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.TokenMap;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.cms.EntireRange;
@@ -65,6 +69,7 @@ public class ClusterMetadata
 
     public final DistributedSchema schema;
     public final Directory directory;
+    public final TokenMap tokenMap;
     public final EndpointsForRange cmsReplicas;
     public final ImmutableSet<InetAddressAndPort> cmsMembers;
 
@@ -73,7 +78,8 @@ public class ClusterMetadata
         this(partitioner, Directory.EMPTY);
     }
 
-    private ClusterMetadata(IPartitioner partitioner, Directory directory)
+    @VisibleForTesting
+    public ClusterMetadata(IPartitioner partitioner, Directory directory)
     {
         this(partitioner, directory, DistributedSchema.first());
     }
@@ -86,6 +92,7 @@ public class ClusterMetadata
              partitioner,
              schema,
              directory,
+             new TokenMap(partitioner),
              ImmutableSet.of(),
              ImmutableMap.of());
     }
@@ -96,15 +103,21 @@ public class ClusterMetadata
                            IPartitioner partitioner,
                            DistributedSchema schema,
                            Directory directory,
+                           TokenMap tokenMap,
                            Set<InetAddressAndPort> cmsMembers,
                            Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
     {
+        // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
+        //  ClusterMetadata in the long term. We need to consider how the 
actual components of metadata can be evolved
+        //  over time.
+        assert tokenMap == null || 
tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner 
for TokenMap doesn't match base partitioner";
         this.epoch = epoch;
         this.period = period;
         this.lastInPeriod = lastInPeriod;
         this.partitioner = partitioner;
         this.schema = schema;
         this.directory = directory;
+        this.tokenMap = tokenMap;
         this.cmsMembers = ImmutableSet.copyOf(cmsMembers);
         this.extensions = ImmutableMap.copyOf(extensions);
 
@@ -143,6 +156,7 @@ public class ClusterMetadata
                                    partitioner,
                                    schema,
                                    directory,
+                                   tokenMap,
                                    cmsMembers,
                                    extensions);
     }
@@ -166,6 +180,7 @@ public class ClusterMetadata
         private final IPartitioner partitioner;
         private DistributedSchema schema;
         private Directory directory;
+        private TokenMap tokenMap;
         private final Set<InetAddressAndPort> cmsMembers;
         private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
         private final Set<MetadataKey> modifiedKeys;
@@ -179,6 +194,7 @@ public class ClusterMetadata
             this.partitioner = metadata.partitioner;
             this.schema = metadata.schema;
             this.directory = metadata.directory;
+            this.tokenMap = metadata.tokenMap;
             this.cmsMembers = new HashSet<>(metadata.cmsMembers);
             extensions = new HashMap<>(metadata.extensions);
             modifiedKeys = new HashSet<>();
@@ -202,6 +218,25 @@ public class ClusterMetadata
             return this;
         }
 
+        public Transformer proposeToken(NodeId nodeId, Collection<Token> 
tokens)
+        {
+            tokenMap = tokenMap.assignTokens(nodeId, tokens);
+            directory = directory.withRackAndDC(nodeId);
+            return this;
+        }
+
+        public Transformer unproposeTokens(NodeId nodeId)
+        {
+            tokenMap = tokenMap.unassignTokens(nodeId);
+            return this;
+        }
+
+        public Transformer unproposeTokens(NodeId nodeId, Collection<Token> 
tokens)
+        {
+            tokenMap = tokenMap.unassignTokens(nodeId, tokens);
+            return this;
+        }
+
         public Transformer withCMSMember(InetAddressAndPort member)
         {
             cmsMembers.add(member);
@@ -270,12 +305,19 @@ public class ClusterMetadata
                 directory = directory.withLastModified(epoch);
             }
 
+            if (tokenMap != base.tokenMap)
+            {
+                modifiedKeys.add(MetadataKeys.TOKEN_MAP);
+                tokenMap = tokenMap.withLastModified(epoch);
+            }
+
             return new Transformed(new ClusterMetadata(epoch,
                                                        period,
                                                        lastInPeriod,
                                                        partitioner,
                                                        schema,
                                                        directory,
+                                                       tokenMap,
                                                        cmsMembers,
                                                        extensions),
                                    ImmutableSet.copyOf(modifiedKeys));
@@ -291,6 +333,7 @@ public class ClusterMetadata
                    ", partitioner=" + partitioner +
                    ", schema=" + schema +
                    ", directory=" + schema +
+                   ", tokenMap=" + tokenMap +
                    ", extensions=" + extensions +
                    ", cmsMembers=" + cmsMembers +
                    ", modifiedKeys=" + modifiedKeys +
@@ -328,13 +371,14 @@ public class ClusterMetadata
                lastInPeriod == that.lastInPeriod &&
                schema.equals(that.schema) &&
                directory.equals(that.directory) &&
+               tokenMap.equals(that.tokenMap) &&
                extensions.equals(that.extensions);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(epoch, lastInPeriod, schema, directory, 
extensions);
+        return Objects.hash(epoch, lastInPeriod, schema, directory, tokenMap, 
extensions);
     }
 
     public static ClusterMetadata current()
@@ -378,6 +422,7 @@ public class ClusterMetadata
             out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
             DistributedSchema.serializer.serialize(metadata.schema, out, 
version);
             Directory.serializer.serialize(metadata.directory, out, version);
+            TokenMap.serializer.serialize(metadata.tokenMap, out, version);
             out.writeInt(metadata.extensions.size());
             for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : 
metadata.extensions.entrySet())
             {
@@ -401,6 +446,7 @@ public class ClusterMetadata
             IPartitioner partitioner = 
FBUtilities.newPartitioner(in.readUTF());
             DistributedSchema schema = 
DistributedSchema.serializer.deserialize(in, version);
             Directory dir = Directory.serializer.deserialize(in, version);
+            TokenMap tokenMap = TokenMap.serializer.deserialize(in, version);
             int items = in.readInt();
             Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new 
HashMap<>(items);
             for (int i = 0; i < items; i++)
@@ -420,6 +466,7 @@ public class ClusterMetadata
                                        partitioner,
                                        schema,
                                        dir,
+                                       tokenMap,
                                        members,
                                        extensions);
         }
@@ -437,7 +484,8 @@ public class ClusterMetadata
                     TypeSizes.BOOL_SIZE +
                     sizeof(metadata.partitioner.getClass().getCanonicalName()) 
+
                     
DistributedSchema.serializer.serializedSize(metadata.schema, version) +
-                    Directory.serializer.serializedSize(metadata.directory, 
version);
+                    Directory.serializer.serializedSize(metadata.directory, 
version) +
+                    TokenMap.serializer.serializedSize(metadata.tokenMap, 
version);
 
             size += TypeSizes.INT_SIZE;
             for (InetAddressAndPort member : metadata.cmsMembers)
diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java 
b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
index b545e03d4c..d1bde221cc 100644
--- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java
+++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
@@ -28,8 +28,11 @@ public class MetadataKeys
 
     public static final MetadataKey SCHEMA                  = make(CORE_NS, 
"schema", "dist_schema");
     public static final MetadataKey NODE_DIRECTORY          = make(CORE_NS, 
"membership", "node_directory");
+    public static final MetadataKey TOKEN_MAP               = make(CORE_NS, 
"ownership", "token_map");
 
-    public static final ImmutableSet<MetadataKey> CORE_METADATA = 
ImmutableSet.of(SCHEMA);
+    public static final ImmutableSet<MetadataKey> CORE_METADATA = 
ImmutableSet.of(SCHEMA,
+                                                                               
   NODE_DIRECTORY,
+                                                                               
   TOKEN_MAP);
 
     public static MetadataKey make(String...parts)
     {
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java 
b/src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java
similarity index 58%
copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
copy to src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java
index 8ae2302527..fd8a4d39f6 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java
@@ -16,22 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.tcm.transformations.cms;
+package org.apache.cassandra.tcm.compatibility;
 
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.NodeId;
 
-public class EntireRange
+public interface AsEndpoints
 {
-    public static final Range<Token> entireRange = new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                                                               
DatabaseDescriptor.getPartitioner().getMinimumToken());
-    public static Replica replica(InetAddressAndPort addr)
-    {
-        return new Replica(addr, entireRange, true);
-    }
-
-}
\ No newline at end of file
+    public NodeId peerId(InetAddressAndPort endpoint);
+    public InetAddressAndPort endpoint(NodeId id);
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java 
b/src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java
similarity index 58%
copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
copy to src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java
index 8ae2302527..3286f1f1d9 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.tcm.transformations.cms;
+package org.apache.cassandra.tcm.compatibility;
 
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Multimap;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
 
-public class EntireRange
+public interface AsLocations
 {
-    public static final Range<Token> entireRange = new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                                                               
DatabaseDescriptor.getPartitioner().getMinimumToken());
-    public static Replica replica(InetAddressAndPort addr)
-    {
-        return new Replica(addr, entireRange, true);
-    }
+    Location location(NodeId peer);
+    Set<InetAddressAndPort> datacenterEndpoints(String datacenter);
+    Multimap<String, InetAddressAndPort> allDatacenterEndpoints();
 
-}
\ No newline at end of file
+    Multimap<String, InetAddressAndPort> datacenterRacks(String datacenter);
+    Map<String, Multimap<String, InetAddressAndPort>> allDatacenterRacks();
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java 
b/src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java
similarity index 57%
copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
copy to src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java
index 8ae2302527..9960dc10b8 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java
@@ -16,22 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.tcm.transformations.cms;
+package org.apache.cassandra.tcm.compatibility;
 
+import java.util.List;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.NodeId;
 
-public class EntireRange
+public interface AsTokenMap
 {
-    public static final Range<Token> entireRange = new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                                                               
DatabaseDescriptor.getPartitioner().getMinimumToken());
-    public static Replica replica(InetAddressAndPort addr)
-    {
-        return new Replica(addr, entireRange, true);
-    }
-
-}
\ No newline at end of file
+    public IPartitioner partitioner();
+    // todo: this could return NavigableSet
+    public List<Token> tokens();
+    public NodeId owner(Token token);
+}
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index ad83e404ae..477a1ea2d8 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Period;
 import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.TokenMap;
 
 public class GossipHelper
 {
@@ -48,6 +49,7 @@ public class GossipHelper
                                    DatabaseDescriptor.getPartitioner(),
                                    
DistributedSchema.fromSystemTables(SchemaKeyspace.fetchNonSystemKeyspaces()),
                                    Directory.EMPTY,
+                                   new 
TokenMap(DatabaseDescriptor.getPartitioner()),
                                    Collections.emptySet(),
                                    Collections.emptyMap());
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java 
b/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java
new file mode 100644
index 0000000000..1698ba399e
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.compatibility;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Functions for interacting with a sorted list of tokens as modelled as a ring
+ */
+public class TokenRingUtils
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TokenRingUtils.class);
+
+    public static int firstTokenIndex(final List<Token> ring, Token start, 
boolean insertMin)
+    {
+        assert ring.size() > 0 : ring.toString();
+        // insert the minimum token (at index == -1) if we were asked to 
include it and it isn't a member of the ring
+        int i = Collections.binarySearch(ring, start);
+        if (i < 0)
+        {
+            i = (i + 1) * (-1);
+            if (i >= ring.size())
+                i = insertMin ? -1 : 0;
+        }
+        return i;
+    }
+
+    public static Token firstToken(List<Token> ring, Token start)
+    {
+        return ring.get(firstTokenIndex(ring, start, false));
+    }
+
+    public static Token getPredecessor(List<Token> ring, Token start)
+    {
+        int idx = firstTokenIndex(ring, start, false);
+        return ring.get(idx == 0 ? ring.size() - 1 : idx - 1);
+    }
+
+    public static Collection<Range<Token>> getPrimaryRangesFor(List<Token> 
ring, Collection<Token> tokens)
+    {
+        Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
+        for (Token right : tokens)
+            ranges.add(new Range<>(getPredecessor(ring, right), right));
+        return ranges;
+    }
+
+    /**
+     * iterator over the Tokens in the given ring, starting with the token for 
the node owning start
+     * (which does not have to be a Token in the ring)
+     * @param includeMin True if the minimum token should be returned in the 
ring even if it has no owner.
+     */
+    public static Iterator<Token> ringIterator(final List<Token> ring, Token 
start, boolean includeMin)
+    {
+        if (ring.isEmpty())
+            return includeMin ? 
Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
+                              : Collections.emptyIterator();
+
+        final boolean insertMin = includeMin && !ring.get(0).isMinimum();
+        final int startIndex = firstTokenIndex(ring, start, insertMin);
+        return new AbstractIterator<Token>()
+        {
+            int j = startIndex;
+            protected Token computeNext()
+            {
+                if (j < -1)
+                    return endOfData();
+                try
+                {
+                    // return minimum for index == -1
+                    if (j == -1)
+                        return start.getPartitioner().getMinimumToken();
+                    // return ring token for other indexes
+                    return ring.get(j);
+                }
+                finally
+                {
+                    j++;
+                    if (j == ring.size())
+                        j = insertMin ? -1 : 0;
+                    if (j == startIndex)
+                        // end iteration
+                        j = -2;
+                }
+            }
+        };
+    }
+
+    /**
+     * Get all ranges that span the ring given a set
+     * of tokens. All ranges are in sorted order of
+     * ranges.
+     * @return ranges in sorted order
+     */
+    public static List<Range<Token>> getAllRanges(List<Token> sortedTokens)
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("computing ranges for {}", 
StringUtils.join(sortedTokens, ", "));
+
+        if (sortedTokens.isEmpty())
+            return Collections.emptyList();
+        int size = sortedTokens.size();
+        List<Range<Token>> ranges = new ArrayList<>(size + 1);
+        for (int i = 1; i < size; ++i)
+        {
+            Range<Token> range = new Range<>(sortedTokens.get(i - 1), 
sortedTokens.get(i));
+            ranges.add(range);
+        }
+        Range<Token> range = new Range<>(sortedTokens.get(size - 1), 
sortedTokens.get(0));
+        ranges.add(range);
+
+        return ranges;
+    }
+
+    public static Range<Token> getRange(List<Token> ring, Token token)
+    {
+        int idx = firstTokenIndex(ring, token, false);
+        Token replicaEnd = ring.get(idx);
+        Token replicaStart = ring.get(idx == 0 ? ring.size() - 1 : idx - 1);
+        return new Range<>(replicaStart, replicaEnd);
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index d04c52dfc1..842c78f95c 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.compatibility.AsEndpoints;
+import org.apache.cassandra.tcm.compatibility.AsLocations;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.net.MessagingService;
@@ -44,7 +46,7 @@ import org.apache.cassandra.utils.btree.BTreeMultimap;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 
-public class Directory implements MetadataValue<Directory>
+public class Directory implements MetadataValue<Directory>, AsEndpoints, 
AsLocations
 {
     public static final Serializer serializer = new Serializer();
 
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java 
b/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java
new file mode 100644
index 0000000000..412c09bb59
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.Comparator;
+
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+public class PrimaryRangeComparator implements Comparator<Replica>
+{
+    private final TokenMap tokens;
+    private final Directory directory;
+
+    public PrimaryRangeComparator(TokenMap tokens, Directory directory)
+    {
+        this.tokens = tokens;
+        this.directory = directory;
+    }
+
+    @Override
+    public int compare(Replica o1, Replica o2)
+    {
+        assert o1.range().equals(o2.range());
+        Token target = 
o1.range().right.equals(tokens.partitioner().getMinimumToken())
+                       ? tokens.tokens().get(0)
+                       : o1.range().right;
+        NodeId owner = tokens.owner(target);
+        return directory.peerId(o1.endpoint()).equals(owner)
+               ? -1
+               : directory.peerId(o2.endpoint()).equals(owner) ? 1 : 0;
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java 
b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
new file mode 100644
index 0000000000..8dde234c5f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.compatibility.AsTokenMap;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.BiMultiValMap;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+
+public class TokenMap implements MetadataValue<TokenMap>, AsTokenMap
+{
+    public static final Serializer serializer = new Serializer();
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TokenMap.class);
+
+    private final SortedBiMultiValMap<Token, NodeId> map;
+    private final List<Token> tokens;
+    private final List<Range<Token>> ranges;
+    // TODO: move partitioner to the users (SimpleStrategy and Uniform Range 
Placement?)
+    private final IPartitioner partitioner;
+    private final Epoch lastModified;
+
+    public TokenMap(IPartitioner partitioner)
+    {
+        this(Epoch.EMPTY, partitioner, SortedBiMultiValMap.create());
+    }
+
+    private TokenMap(Epoch lastModified, IPartitioner partitioner, 
SortedBiMultiValMap<Token, NodeId> map)
+    {
+        this.lastModified = lastModified;
+        this.partitioner = partitioner;
+        this.map = map;
+        this.tokens = tokens();
+        this.ranges = toRanges(tokens, partitioner);
+    }
+
+    @Override
+    public TokenMap withLastModified(Epoch epoch)
+    {
+        return new TokenMap(epoch, partitioner, map);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    public TokenMap assignTokens(NodeId id, Collection<Token> tokens)
+    {
+        SortedBiMultiValMap<Token, NodeId> finalisedCopy = 
SortedBiMultiValMap.create(map);
+        tokens.forEach(t -> finalisedCopy.putIfAbsent(t, id));
+        return new TokenMap(lastModified, partitioner, finalisedCopy);
+    }
+
+    /**
+     * Find a token that is immediately after this one in the ring.
+     */
+    public Token nextInRing(Token token, boolean exactMatchOnly)
+    {
+        int idx = Collections.binarySearch(tokens, token);
+        if (idx < 0)
+        {
+            if (exactMatchOnly)
+                throw new IllegalArgumentException(String.format("Can not find 
token %s in the ring %s", token, token));
+            int realIdx = -1 - idx;
+            if (realIdx == tokens.size())
+                realIdx = 0;
+
+            return tokens.get(realIdx);
+        }
+
+        if (idx == tokens.size() - 1)
+            return tokens.get(0);
+        else
+            return tokens.get(idx + 1);
+    }
+
+    public TokenMap unassignTokens(NodeId id)
+    {
+        SortedBiMultiValMap<Token, NodeId> finalisedCopy = 
SortedBiMultiValMap.create(map);
+        finalisedCopy.removeValue(id);
+        return new TokenMap(lastModified, partitioner, finalisedCopy);
+    }
+
+    public TokenMap unassignTokens(NodeId id, Collection<Token> tokens)
+    {
+        SortedBiMultiValMap<Token, NodeId> finalisedCopy = 
SortedBiMultiValMap.create(map);
+        for (Token token : tokens)
+        {
+            NodeId nodeId = finalisedCopy.remove(token);
+            assert nodeId.equals(id);
+        }
+
+        return new TokenMap(lastModified, partitioner, finalisedCopy);
+    }
+
+    public BiMultiValMap<Token, NodeId> asMap()
+    {
+        return SortedBiMultiValMap.create(map);
+    }
+
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    public IPartitioner partitioner()
+    {
+        return partitioner;
+    }
+
+    public ImmutableList<Token> tokens()
+    {
+        return ImmutableList.copyOf(map.keySet());
+    }
+
+    public ImmutableList<Token> tokens(NodeId nodeId)
+    {
+        Collection<Token> tokens = map.inverse().get(nodeId);
+        if (tokens == null)
+            return null;
+        return ImmutableList.copyOf(tokens);
+    }
+
+    public List<Range<Token>> toRanges()
+    {
+        return ranges;
+    }
+
+    public static List<Range<Token>> toRanges(List<Token> tokens, IPartitioner 
partitioner)
+    {
+        if (tokens.isEmpty())
+            return Collections.emptyList();
+
+        List<Range<Token>> ranges = new ArrayList<>(tokens.size() + 1);
+        maybeAdd(ranges, new Range<>(partitioner.getMinimumToken(), 
tokens.get(0)));
+        for (int i = 1; i < tokens.size(); i++)
+            maybeAdd(ranges, new Range<>(tokens.get(i - 1), tokens.get(i)));
+        maybeAdd(ranges, new Range<>(tokens.get(tokens.size() - 1), 
partitioner.getMinimumToken()));
+        if (ranges.isEmpty())
+            ranges.add(new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken()));
+        return ranges;
+    }
+
+    private static void maybeAdd(List<Range<Token>> ranges, Range<Token> r)
+    {
+        if (r.left.compareTo(r.right) != 0)
+            ranges.add(r);
+    }
+
+    public Token nextToken(List<Token> tokens, Token token)
+    {
+       return tokens.get(nextTokenIndex(tokens, token));
+    }
+
+    //Duplicated from TokenMetadata::firstTokenIndex
+    public static int nextTokenIndex(final List<Token> ring, Token start)
+    {
+        assert ring.size() > 0;
+        int i = Collections.binarySearch(ring, start);
+        if (i < 0)
+        {
+            i = (i + 1) * (-1);
+            if (i >= ring.size())
+                i = 0;
+        }
+        return i;
+    }
+
+    public NodeId owner(Token token)
+    {
+        return map.get(token);
+    }
+
+    public String toString()
+    {
+        return "TokenMap{" +
+               toDebugString()
+               + '}';
+    }
+
+    public void logDebugString()
+    {
+        logger.info(toDebugString());
+    }
+
+    public String toDebugString()
+    {
+        StringBuilder b = new StringBuilder();
+        for (Map.Entry<Token, NodeId> entry : map.entrySet())
+            b.append('[').append(entry.getKey()).append("] => 
").append(entry.getValue().uuid).append(";\n");
+        return b.toString();
+    }
+
+    public static class Serializer implements MetadataSerializer<TokenMap>
+    {
+        public void serialize(TokenMap t, DataOutputPlus out, Version version) 
throws IOException
+        {
+            Epoch.serializer.serialize(t.lastModified, out, version);
+            out.writeUTF(t.partitioner.getClass().getCanonicalName());
+            out.writeInt(t.map.size());
+            for (Map.Entry<Token, NodeId> entry : t.map.entrySet())
+            {
+                Token.metadataSerializer.serialize(entry.getKey(), out, 
version);
+                NodeId.serializer.serialize(entry.getValue(), out, version);
+            }
+        }
+
+        public TokenMap deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            Epoch lastModified = Epoch.serializer.deserialize(in, version);
+            IPartitioner partitioner = 
FBUtilities.newPartitioner(in.readUTF());
+            int size = in.readInt();
+            SortedBiMultiValMap<Token, NodeId> tokens = 
SortedBiMultiValMap.create();
+            for (int i = 0; i < size; i++)
+                tokens.put(Token.metadataSerializer.deserialize(in, version),
+                           NodeId.serializer.deserialize(in, version));
+            return new TokenMap(lastModified, partitioner, tokens);
+        }
+
+        public long serializedSize(TokenMap t, Version version)
+        {
+            long size = Epoch.serializer.serializedSize(t.lastModified, 
version);
+            size += sizeof(t.partitioner.getClass().getCanonicalName());
+            size += sizeof(t.map.size());
+            for (Map.Entry<Token, NodeId> entry : t.map.entrySet())
+            {
+                size += 
Token.metadataSerializer.serializedSize(entry.getKey(), version);
+                size += NodeId.serializer.serializedSize(entry.getValue(), 
version);
+            }
+            return size;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof TokenMap)) return false;
+        TokenMap tokenMap = (TokenMap) o;
+        return Objects.equals(lastModified, tokenMap.lastModified) &&
+               isEquivalent(tokenMap);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(lastModified, map, partitioner);
+    }
+
+    /**
+     * returns true if this token map is functionally equivalent to the given 
one
+     *
+     * does not check equality of lastModified
+     */
+    public boolean isEquivalent(TokenMap tokenMap)
+    {
+        return Objects.equals(map, tokenMap.map) &&
+               Objects.equals(partitioner, tokenMap.partitioner);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java 
b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java
new file mode 100644
index 0000000000..c0e7be3e4f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+/**
+ * The defining feature of this placement stategy is that all layouts (i.e. 
replication params) use the same
+ * set of ranges. So when splitting the current ranges, we only need to 
calculate the splits once and apply to
+ * all existing placements.
+ *
+ * Also, when using this strategy, the read and write placements should 
(eventually) be identical. While range
+ * movements/bootstraps/decommissions are in-flight, this will not be the case 
as the read and write replica
+ * sets will diverge while nodes are acquiring/relinquishing ranges. Although 
there may always be such operations
+ * ongoing, this is technically a temporary state.
+ *
+ * Because of this, when calculating the steps to transition between the 
current state and a proposed new state,
+ * we work from the associated TokenMaps, the assumption being that eventually 
both the read and write placements
+ * will converge and will, at that point, reflect those TokenMaps.
+ * This means that the starting point of each transition is the intended end 
state of the preceding transitions.
+ *
+ * To do this calculation, we create a canonical DataPlacement from the 
current TokenMap and split it according
+ * to the proposed tokens. As we iterate through and split the existing 
ranges, we construct a new DataPlacement for
+ * each currently defined. There is no movement of data between the initial 
and new placements, only splitting of
+ * existing ranges, so we can simply copy the replica groups from the old 
ranges to the new.
+ * e.g.:
+ * We start with a portion of the ring containing tokens 100, 200 assigned to 
nodes A & B respectively. The node
+ * with the next highest token after B is C. RF is 2. We want to add a new 
token, 150 and assign it to a new node, D.
+ * Starting placement, with tokens 100, 200
+ *  (0, 100]    : {A,B}
+ *  (100, 200]  : {B,C}
+ * Next placement, with tokens 100, 150, 200 but no ownership changes
+ *  (0, 100]    : {A,B}
+ *  (100, 150]  : {B,C}
+ *  (150, 200]  : {B,C}
+ * No actual movement has occurred, only the ranges have been adjusted.
+ * We then calculate the desired end state, with new nodes taking ownership of 
their assigned tokens.
+ * If Node D takes ownership of Token 150, the desired end state would be:
+ *  (0, 100]    : {A,D}
+ *  (100, 150]  : {D,C}
+ *  (150, 200]  : {B,C}
+ * As mentioned, distinct placements are maintained for reads and writes, 
enabling ranges to be temporarily
+ * overreplicated during a movement operation (analogous to the previous 
concept of pending ranges).
+ */
+public class UniformRangePlacement implements PlacementProvider
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(UniformRangePlacement.class);
+
+    @Override
+    public PlacementTransitionPlan planForJoin(ClusterMetadata metadata,
+                                               NodeId joining,
+                                               Set<Token> tokens,
+                                               Keyspaces keyspaces)
+    {
+        // There are no other nodes in the cluster, so the joining node will 
be taking ownership of the entire range.
+        if (metadata.tokenMap.isEmpty())
+        {
+            DataPlacements placements = 
calculatePlacements(metadata.transformer()
+                                                                    
.proposeToken(joining, tokens)
+                                                                    .build()
+                                                            .metadata,
+                                                            keyspaces);
+            PlacementDeltas.Builder toStart = 
PlacementDeltas.builder(placements.size());
+            placements.withDistributed((params, placement) -> {
+                toStart.put(params, 
DataPlacement.empty().difference(placements.get(params)));
+            });
+            return new PlacementTransitionPlan(toStart.build(),
+                                               PlacementDeltas.empty(),
+                                               PlacementDeltas.empty(),
+                                               PlacementDeltas.empty());
+        }
+
+        DataPlacements base = calculatePlacements(metadata, keyspaces);
+        DataPlacements start = splitRanges(metadata.tokenMap, 
metadata.tokenMap.assignTokens(joining, tokens), base);
+
+        DataPlacements finalPlacements = 
calculatePlacements(metadata.transformer()
+                                                                     
.proposeToken(joining, tokens)
+                                                                     .build()
+                                                             .metadata,
+                                                             keyspaces);
+
+        DataPlacements maximalPlacements = 
start.combineReplicaGroups(finalPlacements);
+
+        PlacementDeltas.Builder toStart = PlacementDeltas.builder(base.size());
+        PlacementDeltas.Builder toMaximal = 
PlacementDeltas.builder(base.size());
+        PlacementDeltas.Builder toFinal = PlacementDeltas.builder(base.size());
+
+        start.withDistributed((params, startPlacement) -> {
+            toStart.put(params, base.get(params).difference(startPlacement));
+            toMaximal.put(params, 
startPlacement.difference(maximalPlacements.get(params)));
+            toFinal.put(params, 
maximalPlacements.get(params).difference(finalPlacements.get(params)));
+        });
+
+        return new PlacementTransitionPlan(toStart.build(), toMaximal.build(), 
toFinal.build(), PlacementDeltas.empty());
+    }
+
+    @Override
+    public PlacementTransitionPlan planForMove(ClusterMetadata metadata, 
NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces)
+    {
+        DataPlacements base = calculatePlacements(metadata, keyspaces);
+
+        TokenMap withMoved = metadata.transformer()
+                                     .proposeToken(nodeId, tokens)
+                                     .build().metadata.tokenMap;
+        // Introduce new tokens, but do not change the ownership just yet
+        DataPlacements start = splitRanges(metadata.tokenMap, withMoved, base);
+
+        DataPlacements withoutLeaving = 
calculatePlacements(metadata.transformer()
+                                                                    
.unproposeTokens(nodeId)
+                                                                    
.proposeToken(nodeId, tokens)
+                                                                    
.build().metadata,
+                                                            keyspaces);
+
+        // Old tokens owned by the move target are now owned by the 
next-in-ring;
+        // Move target now owns its newly assigned tokens
+        DataPlacements end = splitRanges(metadata.tokenMap, withMoved, 
withoutLeaving);
+
+        DataPlacements maximal = start.combineReplicaGroups(end);
+
+
+        PlacementDeltas.Builder split = PlacementDeltas.builder();
+        PlacementDeltas.Builder toMaximal = PlacementDeltas.builder();
+        PlacementDeltas.Builder toFinal = PlacementDeltas.builder();
+        PlacementDeltas.Builder merge = PlacementDeltas.builder();
+
+        base.withDistributed((params, placement) -> {
+            split.put(params, placement.difference(start.get(params)));
+        });
+
+        maximal.withDistributed((params, placement) -> {
+            toMaximal.put(params, start.get(params).difference(placement));
+            toFinal.put(params, placement.difference(end.get(params)));
+        });
+
+        end.withDistributed((params, placement) -> {
+            merge.put(params, 
placement.difference(withoutLeaving.get(params)));
+        });
+
+        return new PlacementTransitionPlan(split.build(), toMaximal.build(), 
toFinal.build(), merge.build());
+    }
+
+    @Override
+    public PlacementTransitionPlan planForDecommission(ClusterMetadata 
metadata,
+                                                       NodeId nodeId,
+                                                       Keyspaces keyspaces)
+    {
+        // Determine the set of placements to start from. This is the 
canonical set of placements based on the current
+        // TokenMap and collection of Keyspaces/ReplicationParams.
+        List<Range<Token>> currentRanges = calculateRanges(metadata.tokenMap);
+        DataPlacements start = calculatePlacements(currentRanges, metadata, 
keyspaces);
+
+        ClusterMetadata proposed = metadata.transformer()
+                                           .unproposeTokens(nodeId)
+                                           .build()
+                                   .metadata;
+        DataPlacements withoutLeaving = calculatePlacements(proposed, 
keyspaces);
+
+        DataPlacements finalPlacement = splitRanges(proposed.tokenMap, 
metadata.tokenMap,
+                                                    withoutLeaving);
+
+        DataPlacements maximal = start.combineReplicaGroups(finalPlacement);
+
+        PlacementDeltas.Builder toMaximal = 
PlacementDeltas.builder(start.size());
+        PlacementDeltas.Builder toFinal = 
PlacementDeltas.builder(start.size());
+        PlacementDeltas.Builder merge = PlacementDeltas.builder(start.size());
+
+        maximal.withDistributed((params, maxPlacement) -> {
+            // Add new nodes as replicas
+            PlacementDeltas.PlacementDelta maxDelta = 
start.get(params).difference(maxPlacement);
+            toMaximal.put(params, maxDelta);
+
+            PlacementDeltas.PlacementDelta leftDelta = 
maxPlacement.difference(finalPlacement.get(params));
+            toFinal.put(params, leftDelta);
+
+            PlacementDeltas.PlacementDelta finalDelta = 
finalPlacement.get(params).difference(withoutLeaving.get(params));
+            merge.put(params, finalDelta);
+        });
+
+        return new PlacementTransitionPlan(PlacementDeltas.empty(), 
toMaximal.build(), toFinal.build(), merge.build());
+    }
+
+    @Override
+    public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata,
+                                                      NodeId replaced,
+                                                      NodeId replacement,
+                                                      Keyspaces keyspaces)
+    {
+        DataPlacements startPlacements = calculatePlacements(metadata, 
keyspaces);
+        DataPlacements finalPlacements = 
calculatePlacements(metadata.transformer()
+                                                                     
.unproposeTokens(replaced)
+                                                                     
.proposeToken(replacement, metadata.tokenMap.tokens(replaced))
+                                                                     .build()
+                                                             .metadata,
+                                                             keyspaces);
+        DataPlacements maximalPlacements = 
startPlacements.combineReplicaGroups(finalPlacements);
+
+        PlacementDeltas.Builder toMaximal = 
PlacementDeltas.builder(startPlacements.size());
+        PlacementDeltas.Builder toFinal = 
PlacementDeltas.builder(startPlacements.size());
+
+        maximalPlacements.withDistributed((params, max) -> {
+            toMaximal.put(params, startPlacements.get(params).difference(max));
+            toFinal.put(params, max.difference(finalPlacements.get(params)));
+        });
+
+        return new PlacementTransitionPlan(PlacementDeltas.empty(), 
toMaximal.build(), toFinal.build(), PlacementDeltas.empty());
+    }
+
+    public DataPlacements splitRanges(TokenMap current,
+                                      TokenMap proposed,
+                                      DataPlacements currentPlacements)
+    {
+        ImmutableList<Token> currentTokens = current.tokens();
+        ImmutableList<Token> proposedTokens = proposed.tokens();
+        if (currentTokens.isEmpty() || currentTokens.equals(proposedTokens))
+        {
+            return currentPlacements;
+        }
+        else
+        {
+            if (!proposedTokens.containsAll(currentTokens))
+                throw new IllegalArgumentException("Proposed tokens must be 
superset of existing tokens");
+            // we need to split some existing ranges, so apply the new set of 
tokens to the current canonical
+            // placements to get a set of placements with the proposed ranges 
but the current replicas
+            return splitRangesForAllPlacements(proposedTokens, 
currentPlacements);
+        }
+    }
+
+    @VisibleForTesting
+    DataPlacements splitRangesForAllPlacements(List<Token> proposedTokens, 
DataPlacements current)
+    {
+        DataPlacements.Builder builder = 
DataPlacements.builder(current.size());
+        current.asMap().forEach((params, placement) -> {
+            // Don't split ranges for local-only placements
+            if (params.isLocal() || params.isMeta())
+                builder.with(params, placement);
+            else
+                builder.with(params, 
placement.splitRangesForPlacement(proposedTokens));
+        });
+        return builder.build();
+    }
+
+    public DataPlacements calculatePlacements(ClusterMetadata metadata, 
Keyspaces keyspaces)
+    {
+        if (metadata.tokenMap.tokens().isEmpty())
+            return DataPlacements.empty();
+
+        return calculatePlacements(keyspaces, metadata);
+    }
+
+    private DataPlacements calculatePlacements(Keyspaces keyspaces, 
ClusterMetadata metadata)
+    {
+        List<Range<Token>> ranges = calculateRanges(metadata.tokenMap);
+        return calculatePlacements(ranges, metadata, keyspaces);
+    }
+
+    public List<Range<Token>> calculateRanges(TokenMap tokenMap)
+    {
+        List<Token> tokens = tokenMap.tokens();
+        List<Range<Token>> ranges = new ArrayList<>(tokens.size() + 1);
+        maybeAdd(ranges, new Range<>(tokenMap.partitioner().getMinimumToken(), 
tokens.get(0)));
+        for (int i = 1; i < tokens.size(); i++)
+            maybeAdd(ranges, new Range<>(tokens.get(i - 1), tokens.get(i)));
+        maybeAdd(ranges, new Range<>(tokens.get(tokens.size() - 1), 
tokenMap.partitioner().getMinimumToken()));
+        if (ranges.isEmpty())
+            ranges.add(new Range<>(tokenMap.partitioner().getMinimumToken(), 
tokenMap.partitioner().getMinimumToken()));
+        return ranges;
+    }
+
+    public DataPlacements calculatePlacements(List<Range<Token>> ranges,
+                                              ClusterMetadata metadata,
+                                              Keyspaces keyspaces)
+    {
+        Map<ReplicationParams, DataPlacement> placements = new HashMap<>();
+        for (KeyspaceMetadata ksMetadata : keyspaces)
+        {
+            logger.trace("Calculating data placements for {}", 
ksMetadata.name);
+            AbstractReplicationStrategy replication = 
ksMetadata.replicationStrategy;
+            ReplicationParams params = ksMetadata.params.replication;
+            placements.computeIfAbsent(params, p -> 
replication.calculateDataPlacement(ranges, metadata));
+        }
+
+        return DataPlacements.builder(placements).build();
+    }
+
+    private void maybeAdd(List<Range<Token>> ranges, Range<Token> r)
+    {
+        if (r.left.compareTo(r.right) != 0)
+            ranges.add(r);
+    }
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
index 8ae2302527..82ba7117ab 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java
@@ -18,12 +18,15 @@
 
 package org.apache.cassandra.tcm.transformations.cms;
 
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.PlacementForRange;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class EntireRange
 {
@@ -34,4 +37,7 @@ public class EntireRange
         return new Replica(addr, entireRange, true);
     }
 
+    public static final EndpointsForRange localReplicas = 
EndpointsForRange.of(replica(FBUtilities.getBroadcastAddressAndPort()));
+    public static final DataPlacement placement = new 
DataPlacement(PlacementForRange.builder().withReplicaGroup(localReplicas).build(),
+                                                                    
PlacementForRange.builder().withReplicaGroup(localReplicas).build());
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/BiMultiValMap.java 
b/src/java/org/apache/cassandra/utils/BiMultiValMap.java
index 7a87217967..f439c5c496 100644
--- a/src/java/org/apache/cassandra/utils/BiMultiValMap.java
+++ b/src/java/org/apache/cassandra/utils/BiMultiValMap.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.utils;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.collect.HashMultimap;
@@ -143,4 +144,19 @@ public class BiMultiValMap<K, V> implements Map<K, V>
     {
         return reverseMap.keySet();
     }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof BiMultiValMap)) return false;
+        BiMultiValMap<?, ?> that = (BiMultiValMap<?, ?>) o;
+        return forwardMap.equals(that.forwardMap) && 
reverseMap.equals(that.reverseMap);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(forwardMap, reverseMap);
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 81d6694c72..7693387110 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR;
@@ -327,7 +328,7 @@ public class NetworkTopologyStrategyTest
         for (Map.Entry<String, Integer> dc : datacenters.entrySet())
             skippedDcEndpoints.put(dc.getKey(), new 
LinkedHashSet<InetAddressAndPort>());
 
-        Iterator<Token> tokenIter = 
TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+        Iterator<Token> tokenIter = 
TokenRingUtils.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
         while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, 
allEndpoints, datacenters))
         {
             Token next = tokenIter.next();
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java 
b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index c1b76b3bfc..44aa63d678 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -72,7 +73,7 @@ public class TokenMetadataTest
     private static void testRingIterator(ArrayList<Token> ring, String start, 
boolean includeMin, String... expected)
     {
         ArrayList<Token> actual = new ArrayList<>();
-        Iterators.addAll(actual, TokenMetadata.ringIterator(ring, 
token(start), includeMin));
+        Iterators.addAll(actual, TokenRingUtils.ringIterator(ring, 
token(start), includeMin));
         assertEquals(actual.toString(), expected.length, actual.size());
         for (int i = 0; i < expected.length; i++)
             assertEquals("Mismatch at index " + i + ": " + actual, 
token(expected[i]), actual.get(i));
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java 
b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index b6d039b439..84aed2296e 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -47,6 +47,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.SimpleSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 
 import static org.junit.Assert.*;
 
@@ -102,7 +103,7 @@ public class LeaveAndBootstrapTest
         for (Token token : keyTokens)
         {
             List<InetAddressAndPort> endpoints = new ArrayList<>();
-            Iterator<Token> tokenIter = 
TokenMetadata.ringIterator(tmd.sortedTokens(), token, false);
+            Iterator<Token> tokenIter = 
TokenRingUtils.ringIterator(tmd.sortedTokens(), token, false);
             while (tokenIter.hasNext())
             {
                 endpoints.add(tmd.getEndpoint(tokenIter.next()));
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java 
b/test/unit/org/apache/cassandra/service/MoveTest.java
index 6dce8f3398..5260e75ce4 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -68,6 +68,7 @@ import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -608,7 +609,7 @@ public class MoveTest
         for (Token token : keyTokens)
         {
             List<InetAddressAndPort> endpoints = new ArrayList<>();
-            Iterator<Token> tokenIter = 
TokenMetadata.ringIterator(tmd.sortedTokens(), token, false);
+            Iterator<Token> tokenIter = 
TokenRingUtils.ringIterator(tmd.sortedTokens(), token, false);
             while (tokenIter.hasNext())
             {
                 endpoints.add(tmd.getEndpoint(tokenIter.next()));
diff --git a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java 
b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
index e015f67283..c858c2ab06 100644
--- a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
+++ b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java
@@ -25,6 +25,16 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class MembershipUtils
 {
+    public static NodeAddresses nodeAddresses(Random random)
+    {
+        return nodeAddresses(randomEndpoint(random));
+    }
+
+    public static NodeAddresses nodeAddresses(InetAddressAndPort endpoint)
+    {
+        return new NodeAddresses(endpoint, endpoint, endpoint);
+    }
+
     public static InetAddressAndPort randomEndpoint(Random random)
     {
         return endpoint(random.nextInt(254) + 1);
diff --git a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java 
b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
new file mode 100644
index 0000000000..71e4361475
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.ReplicationParams;
+
+import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
+import static 
org.apache.cassandra.tcm.membership.MembershipUtils.randomEndpoint;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+
+public class OwnershipUtils
+{
+    public static Token token(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    public static RangesByEndpoint emptyReplicas()
+    {
+        return new RangesByEndpoint.Builder().build();
+    }
+
+    public static RangesByEndpoint trivialReplicas(InetAddressAndPort 
endpoint, Range<Token> range)
+    {
+        RangesByEndpoint.Builder b = new RangesByEndpoint.Builder();
+        b.put(endpoint, Replica.fullReplica(endpoint, range));
+        return b.build();
+    }
+
+    public static PlacementDeltas deltas(DataPlacements first, DataPlacements 
second)
+    {
+        assert first.asMap().keySet().equals(second.asMap().keySet());
+
+        PlacementDeltas.Builder deltas = PlacementDeltas.builder(first.size());
+        first.asMap().forEach((params, placement) -> {
+            deltas.put(params, placement.difference(second.get(params)));
+        });
+        return deltas.build();
+    }
+
+    public static DataPlacements placements(List<Range<Token>> ranges,
+                                            Set<ReplicationParams> replication,
+                                            Random random)
+    {
+        DataPlacements.Builder allPlacements = 
DataPlacements.builder(replication.size());
+        replication.forEach((params) -> {
+            assertSame("Only simple replication params are necessary/permitted 
here", SimpleStrategy.class, params.klass);
+            String s = params.options.get("replication_factor");
+            assertNotNull(s);
+            int rf = Integer.parseInt(s);
+            DataPlacement.Builder placement = DataPlacement.builder();
+            for (Range<Token> range : ranges)
+            {
+                // pick rf random nodes to be replicas, no duplicates
+                Set<InetAddressAndPort> replicas = new HashSet<>(rf);
+                while (replicas.size() < rf)
+                    replicas.add(randomEndpoint(random));
+
+                replicas.forEach(e -> {
+                    Replica replica = Replica.fullReplica(e, range);
+                    
placement.withReadReplica(replica).withWriteReplica(replica);
+                });
+            }
+            allPlacements.with(params, placement.build());
+        });
+        return allPlacements.build();
+    }
+
+    public static List<Range<Token>> ranges(Random random)
+    {
+        // min 10, max 40 ranges
+        int count = random.nextInt(30) + 10;
+        Set<Long> tokens = new HashSet<>(count);
+        while(tokens.size() < count-1)
+            tokens.add(random.nextLong());
+        List<Long> sorted = new ArrayList<>(tokens);
+        Collections.sort(sorted);
+
+        List<Range<Token>> ranges = new ArrayList<>(count);
+        ranges.add(new Range<>(Murmur3Partitioner.MINIMUM, 
token(sorted.get(0))));
+        for (int i = 1; i < sorted.size(); i++)
+            ranges.add(new Range<>(token(sorted.get(i-1)), 
token(sorted.get(i))));
+        ranges.add(new Range<>(token(sorted.get(sorted.size() - 1)), 
Murmur3Partitioner.MINIMUM));
+        return ranges;
+    }
+
+    public static DataPlacements randomPlacements(Random random)
+    {
+        Set<ReplicationParams> replication = 
ImmutableSet.of(KeyspaceParams.simple(1).replication,
+                                                             
KeyspaceParams.simple(2).replication,
+                                                             
KeyspaceParams.simple(3).replication);
+        return placements(ranges(random), replication, random);
+    }
+
+    public static EndpointsForRange rg(long t0, long t1, int...replicas)
+    {
+        Range<Token> range = new Range<>(token(t0), token(t1));
+        EndpointsForRange.Builder builder = EndpointsForRange.builder(range);
+        for (int i : replicas)
+            builder.add(Replica.fullReplica(endpoint((byte)i), range));
+        return builder.build();
+    }
+
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java 
b/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java
new file mode 100644
index 0000000000..6245c792f0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.schema.ReplicationParams;
+
+import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
+import static org.apache.cassandra.tcm.ownership.OwnershipUtils.emptyReplicas;
+import static org.apache.cassandra.tcm.ownership.OwnershipUtils.token;
+import static 
org.apache.cassandra.tcm.ownership.OwnershipUtils.trivialReplicas;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class PlacementDeltasTest
+{
+    private static final ReplicationParams key = ReplicationParams.simple(1);
+    private static final InetAddressAndPort P1 = endpoint(1);
+    private static final InetAddressAndPort P2 = endpoint(2);
+    private static final InetAddressAndPort P3 = endpoint(3);
+    private static final InetAddressAndPort P4 = endpoint(4);
+    private static final Range<Token> R1 = new Range<>(token(0), token(100));
+    private static final Range<Token> R2 = new Range<>(token(100), token(200));
+    private static final Range<Token> R_INT = new Range<>(token(50), 
token(150));
+
+
+    @Test
+    public void mergeDisjointDeltas()
+    {
+        // Combine 2 Deltas with disjoint removals (and no additions), for the 
same ReplicationParams.
+        // Verify that the resulting merged Delta contains the 
removals/additions from both.
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P2, R2);
+
+        Delta d1 = new Delta(group1, emptyReplicas());
+        Delta d2 = new Delta(group2, emptyReplicas());
+        PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1)
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d1, d1))
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d2, d2))
+                                                               .build()
+                                                               .get(key);
+
+        for (Delta delta : new Delta[]{ merged.reads, merged.writes })
+        {
+            assertTrue(delta.additions.isEmpty());
+            assertEquals(group1.get(P1), delta.removals.get(P1));
+            assertEquals(group2.get(P2), delta.removals.get(P2));
+        }
+    }
+
+    @Test
+    public void mergeDisjointReplicasForSameEndpoint()
+    {
+        // Combine 2 Deltas which both contain removals for the same endpoint, 
but for disjoint ranges.
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P1, R2);
+
+        Delta d1 = new Delta(group1, emptyReplicas());
+        Delta d2 = new Delta(group2, emptyReplicas());
+        PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1)
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d1, d1))
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d2, d2))
+                                                               .build()
+                                                               .get(key);
+
+        for (Delta delta : new Delta[]{ merged.reads, merged.writes })
+        {
+            assertEquals(1, delta.removals.keySet().size());
+            RangesAtEndpoint mergedGroup = delta.removals.get(P1);
+
+            assertEquals(2, mergedGroup.size());
+            group1.flattenValues().forEach(r -> 
assertTrue(mergedGroup.contains(r)));
+            group2.flattenValues().forEach(r -> 
assertTrue(mergedGroup.contains(r)));
+        }
+    }
+
+    @Test
+    public void mergeIdenticalReplicasForSameEndpoint()
+    {
+        // Combine 2 Deltas which both contain identical removals for the same 
endpoint.
+        // Effectively a noop.
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+
+        Delta d1 = new Delta(group1, emptyReplicas());
+        Delta d2 = new Delta(group1, emptyReplicas());
+        PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1)
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d1, d1))
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d2, d2))
+                                                               .build()
+                                                               .get(key);
+
+        for (Delta delta : new Delta[]{ merged.reads, merged.writes })
+        {
+            assertEquals(1, delta.removals.keySet().size());
+            RangesAtEndpoint mergedGroup = delta.removals.get(P1);
+            assertEquals(1, mergedGroup.size());
+            group1.flattenValues().forEach(r -> 
assertTrue(mergedGroup.contains(r)));
+        }
+    }
+
+    @Test
+    public void mergeIntersectingReplicasForSameEndpoint()
+    {
+        // Combine 2 Deltas which both contain replicas for a common endpoint, 
but with intersecting ranges.
+        // TODO there isn't an obvious reason to support this, so perhaps we 
should be conservative and
+        //      explicitly reject it
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P1, R_INT);
+
+        Delta d1 = new Delta(group1, emptyReplicas());
+        Delta d2 = new Delta(group2, emptyReplicas());
+        PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1)
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d1, d1))
+                                                               .put(key, new 
PlacementDeltas.PlacementDelta(d2, d2))
+                                                               
.build().get(key);
+
+        for (Delta delta : new Delta[]{ merged.reads, merged.writes })
+        {
+            assertEquals(1, delta.removals.keySet().size());
+            RangesAtEndpoint mergedGroup = delta.removals.get(P1);
+            assertEquals(2, mergedGroup.size());
+            group1.flattenValues().forEach(r -> 
assertTrue(mergedGroup.contains(r)));
+            group2.flattenValues().forEach(r -> 
assertTrue(mergedGroup.contains(r)));
+        }
+    }
+
+    @Test
+    public void invertSingleDelta()
+    {
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P2, R2);
+
+        Delta d1 = new Delta(group1, group2);
+        Delta d2 = new Delta(group2, group1);
+
+        assertEquals(d1, d2.invert());
+        assertEquals(d2, d2.invert().invert());
+    }
+
+    @Test
+    public void invertEmptyDelta()
+    {
+        Delta d = Delta.empty();
+        assertEquals(d, d.invert());
+    }
+
+    @Test
+    public void invertPartiallyEmptyDelta()
+    {
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P2, R1);
+
+        Delta additions = new Delta(emptyReplicas(), group1);
+        Delta inverted = additions.invert();
+        assertEquals(RangesByEndpoint.EMPTY, inverted.additions);
+        assertEquals(additions.additions, inverted.removals);
+
+        Delta removals = new Delta(group2, emptyReplicas());
+        inverted = removals.invert();
+        assertEquals(RangesByEndpoint.EMPTY, inverted.removals);
+        assertEquals(removals.removals, inverted.additions);
+    }
+
+    @Test
+    public void invertPlacementDelta()
+    {
+        RangesByEndpoint group1 = trivialReplicas(P1, R1);
+        RangesByEndpoint group2 = trivialReplicas(P2, R1);
+        Delta d1 = new Delta(group1, group2);
+
+        RangesByEndpoint group3 = trivialReplicas(P3, R1);
+        RangesByEndpoint group4 = trivialReplicas(P4, R2);
+        Delta d2 = new Delta(group3, group4);
+
+        PlacementDeltas.PlacementDelta pd1 = new 
PlacementDeltas.PlacementDelta(d1,d2);
+        PlacementDeltas.PlacementDelta pd2 = new 
PlacementDeltas.PlacementDelta(d1.invert(), d2.invert());
+        assertEquals(pd2, pd1.invert());
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java
 
b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java
new file mode 100644
index 0000000000..6f31d8539e
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.MembershipUtils;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static 
org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses;
+import static org.junit.Assert.assertEquals;
+
+public class UniformRangePlacementEquivalenceTest
+{
+    private static Map<String, Integer> DATACENTERS = ImmutableMap.of("rf1", 
1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
+    private static final String KEYSPACE = "ks";
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        SchemaLoader.prepareServer();
+    }
+
+    @Test
+    public void testSSPlacement()
+    {
+        int[] rfValues = new int[] {1, 2, 3, 5, 12};
+        for (int rf : rfValues)
+        {
+            KeyspaceParams params = KeyspaceParams.simple(rf);
+            KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params);
+            BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> strategy = ssProvider(params.replication);
+            testCalculateEndpoints(1, ksm, strategy);
+            testCalculateEndpoints(16, ksm, strategy);
+            testCalculateEndpoints(64, ksm, strategy);
+        }
+    }
+
+    @Test
+    public void testNTSPlacement()
+    {
+        KeyspaceParams params = toNTSParams(DATACENTERS);
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params);
+        BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> strategy = ntsProvider(params.replication);
+        testCalculateEndpoints(1, ksm, strategy);
+        testCalculateEndpoints(16, ksm, strategy);
+        testCalculateEndpoints(64, ksm, strategy);
+    }
+
+    private void testCalculateEndpoints(int tokensPerNode,
+                                        KeyspaceMetadata keyspace,
+                                        BiFunction<TokenMetadata, 
IEndpointSnitch, AbstractReplicationStrategy> strategy)
+    {
+        final int NODES = 100;
+        final int VNODES = tokensPerNode;
+        final int RUNS = 10;
+
+        List<InetAddressAndPort> endpoints = nodes(NODES);
+
+        for (int run = 0; run < RUNS; ++run)
+        {
+            Random rand = new Random();
+            IEndpointSnitch snitch = generateSnitch(DATACENTERS, endpoints, 
rand);
+            DatabaseDescriptor.setEndpointSnitch(snitch);
+            TokenMetadata tokenMetadata = new TokenMetadata(snitch);
+            Directory directory = directory(endpoints, snitch);
+            ClusterMetadata metadata = new 
ClusterMetadata(Murmur3Partitioner.instance, directory);
+
+
+            for (int i = 0; i < NODES; ++i)
+                metadata = joinNode(metadata, tokenMetadata, endpoints, i, 
VNODES, rand);
+
+            AbstractReplicationStrategy strat = strategy.apply(tokenMetadata, 
snitch);
+            Function<Token, List<InetAddressAndPort>> oldLocatorFn = token ->  
new ArrayList<>(strat.calculateNaturalReplicas(token, 
tokenMetadata).endpoints());
+
+            UniformRangePlacement layout = new UniformRangePlacement();
+            DataPlacement placement = layout.calculatePlacements(metadata, 
Keyspaces.of(keyspace)).get(keyspace.params.replication);
+            Function<Token, List<InetAddressAndPort>> newLocatorFn = t -> 
placement.reads.forToken(t).endpointList();
+
+            testEquivalence(oldLocatorFn, newLocatorFn, rand);
+        }
+    }
+
+    @Test
+    public void testSSPlacementTransformation()
+    {
+        int[] rfValues = new int[] {1, 2, 3, 5, 12};
+        for (int rf : rfValues)
+        {
+            KeyspaceParams params = KeyspaceParams.simple(rf);
+            KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params);
+            BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> strategy = ssProvider(params.replication);
+            doTransformationTest(1, ksm, strategy);
+            doTransformationTest(16, ksm, strategy);
+            doTransformationTest(64, ksm, strategy);
+        }
+    }
+
+    @Test
+    public void testNTSPlacementTransformation()
+    {
+        KeyspaceParams params = toNTSParams(DATACENTERS);
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params);
+        BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> strategy = ntsProvider(params.replication);
+        doTransformationTest(1, ksm, strategy);
+        doTransformationTest(16, ksm, strategy);
+        doTransformationTest(64, ksm, strategy);
+    }
+
+    private void doTransformationTest(int tokensPerNode,
+                                      KeyspaceMetadata ksm,
+                                      BiFunction<TokenMetadata, 
IEndpointSnitch, AbstractReplicationStrategy> strategy)
+    {
+        final int NODES = 100;
+        final int VNODES = tokensPerNode;
+
+        UniformRangePlacement layout = new UniformRangePlacement();
+        Keyspaces keyspaces = Keyspaces.of(ksm);
+        long seed = System.nanoTime();
+        String log = String.format("Running transformation test with params; " 
+
+                                   "Seed: %d, Nodes: %d, VNodes: %d, 
Replication: %s",
+                                   seed, NODES, VNODES, 
ksm.params.replication);
+        System.out.println(log);
+        Random rand = new Random(seed);
+
+        // initialise nodes for the test
+        List<InetAddressAndPort> endpoints = nodes(NODES);
+        IEndpointSnitch snitch = generateSnitch(DATACENTERS, endpoints, rand);
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+        TokenMetadata tokenMetadata = new TokenMetadata(snitch);
+        Directory directory = directory(endpoints, snitch);
+        ClusterMetadata metadata = new 
ClusterMetadata(Murmur3Partitioner.instance, directory);
+
+        // join all but one of the nodes
+        for (int i = 0; i < NODES - 1; ++i)
+            metadata = joinNode(metadata, tokenMetadata, endpoints, i, VNODES, 
rand);
+
+
+        // verify that the old and new placement/location methods agree
+        PlacementForRange p1 = layout.calculatePlacements(metadata, 
keyspaces).get(ksm.params.replication).reads;
+        AbstractReplicationStrategy strat = strategy.apply(tokenMetadata, 
snitch);
+        Function<Token, List<InetAddressAndPort>> oldLocatorFn = 
oldLocatorFn(strat, tokenMetadata);
+        Function<Token, List<InetAddressAndPort>> newLocatorFn = t -> 
p1.forToken(t).endpointList();
+        testEquivalence(oldLocatorFn, newLocatorFn, rand);
+
+        // now add the remaining node
+        metadata = joinNode(metadata, tokenMetadata, endpoints, NODES - 1, 
VNODES, rand);
+
+        // re-check the placements
+        PlacementForRange p2 = layout.calculatePlacements(metadata, 
keyspaces).get(ksm.params.replication).reads;
+        strat = strategy.apply(tokenMetadata, snitch);
+        oldLocatorFn = oldLocatorFn(strat, tokenMetadata);
+        newLocatorFn = t -> p2.forToken(t).endpointList();
+        testEquivalence(oldLocatorFn, newLocatorFn, rand);
+
+        // get the specific operations needed to transform the first placement 
with
+        // the initial nodes to the new one with all nodes. Then apply those 
operations
+        // to the first placement and assert the result matches the second 
placement
+        // which was directly calculated with all nodes having joined.
+        Delta delta = p1.difference(p2);
+        PlacementForRange p3 = 
p1.without(delta.removals).with(delta.additions);
+        newLocatorFn = t -> p3.forToken(t).endpointList();
+        testEquivalence(oldLocatorFn, newLocatorFn, rand);
+        testRangeEquivalence(p2, p3);
+    }
+
+    private Directory directory(List<InetAddressAndPort> endpoints, 
IEndpointSnitch snitch)
+    {
+        Directory directory = new Directory();
+        for (InetAddressAndPort endpoint : endpoints)
+            directory = directory.with(nodeAddresses(endpoint), new 
Location(snitch.getDatacenter(endpoint), snitch.getRack(endpoint)));
+        return directory;
+    }
+
+    private ClusterMetadata joinNode(ClusterMetadata metadata,
+                                     TokenMetadata tokenMetadata,
+                                     List<InetAddressAndPort> endpoints,
+                                     int peerIndex,
+                                     int tokensPerNode,
+                                     Random rand)
+    {
+        return joinNode(metadata,
+                        tokenMetadata,
+                        endpoints,
+                        peerIndex,
+                        tokensPerNode,
+                        () -> 
Murmur3Partitioner.instance.getRandomToken(rand));
+    }
+
+    private ClusterMetadata joinNode(ClusterMetadata metadata,
+                                     TokenMetadata tokenMetadata,
+                                     List<InetAddressAndPort> endpoints,
+                                     int peerIndex,
+                                     int tokensPerNode,
+                                     Supplier<Token> token)
+    {
+        InetAddressAndPort endpoint = endpoints.get(peerIndex);
+        NodeId id = metadata.directory.peerId(endpoint);
+        Set<Token> tokens = new HashSet<>();
+        for (int j = 0; j < tokensPerNode; ++j)
+            tokens.add(token.get());
+        tokenMetadata.updateNormalTokens(tokens, endpoint);
+        tokenMetadata.updateHostId(id.uuid, endpoint);
+        return metadata.transformer().proposeToken(id, 
tokens).build().metadata;
+    }
+
+    private void testEquivalence(Function<Token, List<InetAddressAndPort>> 
oldLocatorFn,
+                                 Function<Token, List<InetAddressAndPort>> 
newLocatorFn,
+                                 Random rand)
+    {
+        for (int i=0; i<1000; ++i)
+        {
+            Token token = Murmur3Partitioner.instance.getRandomToken(rand);
+            List<InetAddressAndPort> expected = oldLocatorFn.apply(token);
+            List<InetAddressAndPort> actual = newLocatorFn.apply(token);
+            if (endpointsDiffer(expected, actual))
+            {
+                System.err.println("Endpoints mismatch for token " + token);
+                System.err.println(" expected: " + expected);
+                System.err.println(" actual  : " + actual);
+                assertEquals("Endpoints for token " + token + " mismatch.", 
expected, actual);
+            }
+        }
+    }
+
+    private boolean endpointsDiffer(List<InetAddressAndPort> ep1, 
List<InetAddressAndPort> ep2)
+    {
+        if (ep1.equals(ep2))
+            return false;
+        // Because the old algorithm does not put the nodes in the correct 
order in the case where more replicas
+        // are required than there are racks in a dc, we accept different 
order as long as the primary
+        // replica is the same.
+        if (!ep1.get(0).equals(ep2.get(0)))
+            return true;
+        Set<InetAddressAndPort> s1 = new HashSet<>(ep1);
+        Set<InetAddressAndPort> s2 = new HashSet<>(ep2);
+        return !s1.equals(s2);
+    }
+
+    private void testRangeEquivalence(PlacementForRange p1, PlacementForRange 
p2)
+    {
+        RangesByEndpoint byEndpoint1 = p1.byEndpoint();
+        RangesByEndpoint byEndpoint2 = p2.byEndpoint();
+        assertEquals(byEndpoint1.keySet(), byEndpoint2.keySet());
+
+        for (InetAddressAndPort endpoint : byEndpoint1.keySet())
+        {
+            assertEquals(Range.normalize(byEndpoint1.get(endpoint).ranges()),
+                         Range.normalize(byEndpoint2.get(endpoint).ranges()));
+        }
+    }
+
+    private Function<Token, List<InetAddressAndPort>> 
oldLocatorFn(AbstractReplicationStrategy strategy,
+                                                                   
TokenMetadata tokenMetadata)
+    {
+        return token ->  new 
ArrayList<>(strategy.calculateNaturalReplicas(token, 
tokenMetadata).endpoints());
+    }
+
+    private BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> ssProvider(ReplicationParams params)
+    {
+        return (tokenMetadata, snitch) -> {
+            Map<String, String> p = new HashMap<>(params.options);
+            p.remove(ReplicationParams.CLASS);
+            return new SimpleStrategy(KEYSPACE, tokenMetadata, snitch, p);
+        };
+    }
+
+    private BiFunction<TokenMetadata, IEndpointSnitch, 
AbstractReplicationStrategy> ntsProvider(ReplicationParams params)
+    {
+        return (tokenMetadata, snitch) -> {
+            Map<String, String> p = new HashMap<>(params.options);
+            p.remove(ReplicationParams.CLASS);
+            return new NetworkTopologyStrategy(KEYSPACE, tokenMetadata, 
snitch, p);
+        };
+    }
+
+    private List<InetAddressAndPort> nodes(int count)
+    {
+        List<InetAddressAndPort> nodes = new ArrayList<>(count);
+        for (byte i=1; i<=count; ++i)
+            nodes.add(MembershipUtils.endpoint(i));
+        return nodes;
+    }
+
+    private KeyspaceParams toNTSParams(Map<String, Integer> datacenters)
+    {
+        List<String> args = new ArrayList<>(datacenters.size() * 2);
+        datacenters.forEach((key, val) -> { args.add(key); 
args.add(Integer.toString(val));});
+        return KeyspaceParams.nts(args.toArray());
+    }
+
+    IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, 
Collection<InetAddressAndPort> nodes, Random rand)
+    {
+        final Map<InetAddressAndPort, String> nodeToRack = new HashMap<>();
+        final Map<InetAddressAndPort, String> nodeToDC = new HashMap<>();
+        Map<String, List<String>> racksPerDC = new HashMap<>();
+        datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, 
rand)));
+        int rf = datacenters.values().stream().mapToInt(x -> x).sum();
+        String[] dcs = new String[rf];
+        int pos = 0;
+        for (Map.Entry<String, Integer> dce : datacenters.entrySet())
+        {
+            for (int i = 0; i < dce.getValue(); ++i)
+                dcs[pos++] = dce.getKey();
+        }
+
+        for (InetAddressAndPort node : nodes)
+        {
+            String dc = dcs[rand.nextInt(rf)];
+            List<String> racks = racksPerDC.get(dc);
+            String rack = racks.get(rand.nextInt(racks.size()));
+            nodeToRack.put(node, rack);
+            nodeToDC.put(node, dc);
+        }
+
+        return new AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return nodeToRack.get(endpoint);
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                return nodeToDC.get(endpoint);
+            }
+        };
+    }
+
+    private List<String> randomRacks(int rf, Random rand)
+    {
+        int rc = rand.nextInt(rf * 3 - 1) + 1;
+        List<String> racks = new ArrayList<>(rc);
+        for (int i=0; i<rc; ++i)
+            racks.add(Integer.toString(i));
+        return racks;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java 
b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java
new file mode 100644
index 0000000000..c18f74da75
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.schema.ReplicationParams;
+
+import static org.apache.cassandra.tcm.ownership.OwnershipUtils.rg;
+import static org.apache.cassandra.tcm.ownership.OwnershipUtils.token;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class UniformRangePlacementTest
+{
+
+    @Test
+    public void testSplittingPlacementWithSingleRange()
+    {
+        // Special case of splitting a range that was created because the 
initial token list contained a single token,
+        // which is either the min or max value in the token space. Any other 
single token would produce two ranges -
+        // (MIN, t] & (t, MAX] - but because (x, x] denotes a wraparound, this 
case produces (MIN, MAX] and we need to
+        // verify that we can safely split that when more tokens are 
introduced. This test supposes MIN = 0, MAX = 100
+        PlacementForRange before = PlacementForRange.builder()
+                                                    .withReplicaGroup(rg(0, 
100, 1, 2, 3))
+                                                    .build();
+        // existing token is MIN (i.e. 0 for the purposes of this test)
+        List<Token> tokens = ImmutableList.of(token(0), token(30), token(60), 
token(90));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 30, 1, 2, 3),
+                        rg(30, 60, 1, 2, 3),
+                        rg(60, 90, 1, 2, 3),
+                        rg(90, 100, 1, 2, 3));
+
+
+        // existing token is MAX (i.e. 100 for the purposes of this test).
+        tokens = ImmutableList.of(token(30), token(60), token(90), token(100));
+        after = PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 30, 1, 2, 3),
+                        rg(30, 60, 1, 2, 3),
+                        rg(60, 90, 1, 2, 3),
+                        rg(90, 100, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitSingleRange()
+    {
+        // Start with:  (0,100]   : (n1,n2,n3);
+        //              (100,200] : (n1,n2,n3);
+        //              (200,300] : (n1,n2,n3);
+        //              (300,400] : (n1,n2,n3);
+        PlacementForRange before = initialPlacement();
+        // split (100,200] to (100,150], (150,200]
+        List<Token> tokens = ImmutableList.of(token(100), token(150), 
token(200), token(300));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 100, 1, 2, 3),
+                        rg(100, 150, 1, 2, 3),
+                        rg(150, 200, 1, 2, 3),
+                        rg(200, 300, 1, 2, 3),
+                        rg(300, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitMultipleDisjointRanges()
+    {
+        // Start with:  (0,100]   : (n1,n2,n3);
+        //              (100,200] : (n1,n2,n3);
+        //              (200,300] : (n1,n2,n3);
+        //              (300,400] : (n1,n2,n3);
+        PlacementForRange before = initialPlacement();
+        // split (100,200] to (100,150],(150,200]
+        // and   (200,300] to (200,250],(250,300]
+        List<Token> tokens = ImmutableList.of(token(100), token(150), 
token(200), token(250), token(300));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 100, 1, 2, 3),
+                        rg(100, 150, 1, 2, 3),
+                        rg(150, 200, 1, 2, 3),
+                        rg(200, 250, 1, 2, 3),
+                        rg(250, 300, 1, 2, 3),
+                        rg(300, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitSingleRangeMultipleTimes()
+    {
+        // Start with:  (0,100]   : (n1,n2,n3);
+        //              (100,200] : (n1,n2,n3);
+        //              (200,300] : (n1,n2,n3);
+        //              (300,400] : (n1,n2,n3);
+        PlacementForRange before = initialPlacement();
+        // split (100,200] to (100,125],(125,150],(150,200]
+        List<Token> tokens = ImmutableList.of(token(100), token(125), 
token(150), token(200), token(300));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 100, 1, 2, 3),
+                        rg(100, 125, 1, 2, 3),
+                        rg(125, 150, 1, 2, 3),
+                        rg(150, 200, 1, 2, 3),
+                        rg(200, 300, 1, 2, 3),
+                        rg(300, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitMultipleRangesMultipleTimes()
+    {
+        PlacementForRange before = initialPlacement();
+        // split (100,200] to (100,125],(125,150],(150,200]
+        // and   (200,300] to (200,225],(225,250],(250,300]
+        List<Token> tokens = ImmutableList.of(token(100), token(125), 
token(150), token(200), token(225), token(250), token(300));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 100, 1, 2, 3),
+                        rg(100, 125, 1, 2, 3),
+                        rg(125, 150, 1, 2, 3),
+                        rg(150, 200, 1, 2, 3),
+                        rg(200, 225, 1, 2, 3),
+                        rg(225, 250, 1, 2, 3),
+                        rg(250, 300, 1, 2, 3),
+                        rg(300, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitLastRangeMultipleTimes()
+    {
+        // Start with:  (0,100]   : (n1,n2,n3);
+        //              (100,200] : (n1,n2,n3);
+        //              (200,300] : (n1,n2,n3);
+        //              (300,400] : (n1,n2,n3);
+        PlacementForRange before = initialPlacement();
+        // split (300,400] to (300,325],(325,350],(350,400]
+        List<Token> tokens = ImmutableList.of(token(100), token(200), 
token(300), token(325), token(350));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 100, 1, 2, 3),
+                        rg(100, 200, 1, 2, 3),
+                        rg(200, 300, 1, 2, 3),
+                        rg(300, 325, 1, 2, 3),
+                        rg(325, 350, 1, 2, 3),
+                        rg(350, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testSplitFirstRangeMultipleTimes()
+    {
+        // Start with:  (0,100]   : (n1,n2,n3);
+        //              (100,200] : (n1,n2,n3);
+        //              (200,300] : (n1,n2,n3);
+        //              (300,400] : (n1,n2,n3);
+        PlacementForRange before = initialPlacement();
+        // split (0,100] to (0,25],(25,50],(50,100]
+        List<Token> tokens = ImmutableList.of(token(25), token(50), 
token(100), token(200), token(300));
+        PlacementForRange after = 
PlacementForRange.splitRangesForPlacement(tokens, before);
+        assertPlacement(after,
+                        rg(0, 25, 1, 2, 3),
+                        rg(25, 50, 1, 2, 3),
+                        rg(50, 100, 1, 2, 3),
+                        rg(100, 200, 1, 2, 3),
+                        rg(200, 300, 1, 2, 3),
+                        rg(300, 400, 1, 2, 3));
+    }
+
+    @Test
+    public void testCombiningPlacements()
+    {
+        EndpointsForRange[] firstGroups = { rg(0, 100, 1, 2, 3),
+                                            rg(100, 200, 1, 2, 3),
+                                            rg(200, 300, 1, 2, 3),
+                                            rg(300, 400, 1, 2, 3) };
+        PlacementForRange p1 = 
PlacementForRange.builder().withReplicaGroups(Arrays.asList(firstGroups)).build();
+        EndpointsForRange[] secondGroups = { rg(0, 100, 2, 3, 4),
+                                             rg(100, 200, 2, 3, 5),
+                                             rg(200, 300, 2, 3, 6),
+                                             rg(300, 400, 2, 3, 7) };
+        PlacementForRange p2 = 
PlacementForRange.builder().withReplicaGroups(Arrays.asList(secondGroups)).build();
+
+        ReplicationParams params = ReplicationParams.simple(1);
+        DataPlacements map1 = DataPlacements.builder(1).with(params, new 
DataPlacement(p1, p1)).build();
+        DataPlacements map2 = DataPlacements.builder(1).with(params, new 
DataPlacement(p2, p2)).build();
+        DataPlacement p3 = map1.combineReplicaGroups(map2).get(params);
+        for (PlacementForRange placement : new PlacementForRange[]{ p3.reads, 
p3.writes })
+        {
+            assertPlacement(placement,
+                            rg(0, 100, 1, 2, 3, 4),
+                            rg(100, 200, 1, 2, 3, 5),
+                            rg(200, 300, 1, 2, 3, 6),
+                            rg(300, 400, 1, 2, 3, 7));
+        }
+    }
+
+    @Test
+    public void testSplittingNeedsSorting()
+    {
+        List<Token> tokens = ImmutableList.of(token(-4611686018427387905L), 
token(-3L));
+        EndpointsForRange[] initial = { rg(-9223372036854775808L, 
-4611686018427387905L, 1),
+                                        rg(-4611686018427387905L, 
-9223372036854775808L, 1)};
+        DataPlacement.Builder builder = DataPlacement.builder();
+        builder.writes.withReplicaGroups(Arrays.asList(initial));
+
+        DataPlacement initialPlacement = builder.build();
+        DataPlacement split = initialPlacement.splitRangesForPlacement(tokens);
+        assertPlacement(split.writes, rg(-3, -9223372036854775808L, 1),
+                        rg(-9223372036854775808L,-4611686018427387905L, 1),
+                        rg(-4611686018427387905L, -3, 1));
+    }
+
+    @Test
+    public void testInitialFullWrappingRange()
+    {
+        /*
+        TOKENS=[-9223372036854775808, 3074457345618258602] 
PLACEMENT=[[Full(/127.0.0.1:7000,(-9223372036854775808,-9223372036854775808])]]
+         */
+        List<Token> tokens = ImmutableList.of(token(-9223372036854775808L), 
token(3074457345618258602L));
+        EndpointsForRange[] initial = { rg(-9223372036854775808L, 
-9223372036854775808L, 1)};
+
+        DataPlacement.Builder builder = DataPlacement.builder();
+        builder.writes.withReplicaGroups(Arrays.asList(initial));
+
+        DataPlacement initialPlacement = builder.build();
+        DataPlacement split = initialPlacement.splitRangesForPlacement(tokens);
+        assertPlacement(split.writes, 
rg(3074457345618258602L,-9223372036854775808L, 1), rg(-9223372036854775808L, 
3074457345618258602L, 1));
+    }
+
+    @Test
+    public void testWithMinToken()
+    {
+        EndpointsForRange initialRG = rg(Long.MIN_VALUE, Long.MIN_VALUE, 1, 2, 
3);
+
+        DataPlacement.Builder builder = DataPlacement.builder();
+        builder.writes.withReplicaGroup(initialRG);
+
+        DataPlacement initialPlacement = builder.build();
+        List<Token> tokens = ImmutableList.of(token(Long.MIN_VALUE), token(0));
+        DataPlacement newPlacement = 
initialPlacement.splitRangesForPlacement(tokens);
+        assertEquals(2, newPlacement.writes.replicaGroups.values().size());
+    }
+
+    private PlacementForRange initialPlacement()
+    {
+        EndpointsForRange[] initialGroups = { rg(0, 100, 1, 2, 3),
+                                              rg(100, 200, 1, 2, 3),
+                                              rg(200, 300, 1, 2, 3),
+                                              rg(300, 400, 1, 2, 3) };
+        PlacementForRange placement = 
PlacementForRange.builder().withReplicaGroups(Arrays.asList(initialGroups)).build();
+        assertPlacement(placement, initialGroups);
+        return placement;
+    }
+
+    private void assertPlacement(PlacementForRange placement, 
EndpointsForRange...expected)
+    {
+        Collection<EndpointsForRange> replicaGroups = 
placement.replicaGroups.values();
+        assertEquals(replicaGroups.size(), expected.length);
+        int i = 0;
+        boolean allMatch = true;
+        for(EndpointsForRange group : replicaGroups)
+            if (!Iterables.elementsEqual(group, expected[i++]))
+                allMatch = false;
+
+        assertTrue(String.format("Placement didn't match expected replica 
groups. " +
+                                 "%nExpected: %s%nActual: %s", 
Arrays.asList(expected), replicaGroups),
+                   allMatch);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to