This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9b8bd95176ffad024e4beaab7ae84abfeb1d93f6
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Wed Mar 1 12:52:29 2023 +0000

    [CEP-21] Add basics of ownership and data placement
    
    Introduce new classes for representing placement of data ranges on
    replicas, along with the movement of data via transitions from one
    placement to the next. Eventually, these placements will be statically
    calculated in response to events with alter either the topology of the
    cluster (i.e. adding/removing/moving nodes) or the replication profile
    of the data itself (i.e. creating/altering keyspaces). These triggering
    events will be distributed and enacted consistently using the global log.
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../cassandra/dht/OrderPreservingPartitioner.java  |  24 +
 src/java/org/apache/cassandra/dht/Range.java       |  27 ++
 src/java/org/apache/cassandra/dht/Token.java       |  27 ++
 .../apache/cassandra/locator/RangesByEndpoint.java |  69 +++
 .../cassandra/tcm/ownership/DataPlacement.java     | 211 +++++++++
 .../cassandra/tcm/ownership/DataPlacements.java    | 216 +++++++++
 .../org/apache/cassandra/tcm/ownership/Delta.java  | 125 +++++
 .../tcm/ownership/GlobalPlacementDelta.java        | 163 +++++++
 .../cassandra/tcm/ownership/MovementMap.java       |  89 ++++
 .../cassandra/tcm/ownership/PlacementDeltas.java   | 244 ++++++++++
 .../cassandra/tcm/ownership/PlacementForRange.java | 525 +++++++++++++++++++++
 .../cassandra/tcm/ownership/PlacementProvider.java |  53 +++
 .../tcm/ownership/PlacementTransitionPlan.java     | 128 +++++
 .../cassandra/tcm/ownership/ReplicationMap.java    | 106 +++++
 .../cassandra/tcm/sequences/LockedRanges.java      | 430 +++++++++++++++++
 15 files changed, 2437 insertions(+)

diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java 
b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
index 2d4def95d0..fb9d0011a7 100644
--- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java
@@ -45,6 +45,15 @@ public class OrderPreservingPartitioner implements 
IPartitioner
     private static final String rndchars = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
 
     public static final StringToken MINIMUM = new StringToken("");
+    public static final StringToken MAXIMUM = new StringToken("") {
+        public int compareTo(Token o)
+        {
+            if (o == MAXIMUM)
+                return 0;
+
+            return 1;
+        }
+    };
 
     public static final BigInteger CHAR_MASK = new BigInteger("65535");
 
@@ -116,6 +125,11 @@ public class OrderPreservingPartitioner implements 
IPartitioner
         return MINIMUM;
     }
 
+    public StringToken getMaximumToken()
+    {
+        return MAXIMUM;
+    }
+
     public StringToken getRandomToken()
     {
         return getRandomToken(ThreadLocalRandom.current());
@@ -208,6 +222,16 @@ public class OrderPreservingPartitioner implements 
IPartitioner
         {
             return ByteSource.of(token, version);
         }
+
+        @Override
+        public int compareTo(Token o)
+        {
+            // todo (rebase): I have no recollection of why this is needed - 
investigate
+            if (o == MAXIMUM)
+                    return -1;
+
+            return super.compareTo(o);
+        }
     }
 
     public StringToken getToken(ByteBuffer key)
diff --git a/src/java/org/apache/cassandra/dht/Range.java 
b/src/java/org/apache/cassandra/dht/Range.java
index 2c468990d2..581602cccd 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.dht;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
 import java.util.function.Predicate;
@@ -25,6 +26,11 @@ import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.ObjectUtils;
 
 import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -38,6 +44,7 @@ import org.apache.cassandra.utils.Pair;
  */
 public class Range<T extends RingPosition<T>> extends AbstractBounds<T> 
implements Comparable<Range<T>>, Serializable
 {
+    public static final Serializer serializer = new Serializer();
     public static final long serialVersionUID = 1L;
 
     public Range(T left, T right)
@@ -626,4 +633,24 @@ public class Range<T extends RingPosition<T>> extends 
AbstractBounds<T> implemen
             }
         }
     }
+
+    public static class Serializer implements MetadataSerializer<Range<Token>>
+    {
+        private static final int SERDE_VERSION = MessagingService.VERSION_40;
+
+        public void serialize(Range<Token> t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            tokenSerializer.serialize(t, out, SERDE_VERSION);
+        }
+
+        public Range<Token> deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            return (Range<Token>) tokenSerializer.deserialize(in, 
IPartitioner.global(), SERDE_VERSION);
+        }
+
+        public long serializedSize(Range<Token> t, Version version)
+        {
+            return tokenSerializer.serializedSize(t, SERDE_VERSION);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/dht/Token.java 
b/src/java/org/apache/cassandra/dht/Token.java
index 3543dabc0e..3801654e61 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -25,15 +25,20 @@ import java.nio.ByteBuffer;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.bytecomparable.ByteComparable;
 import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.net.MessagingService;
 
 public abstract class Token implements RingPosition<Token>, Serializable
 {
     private static final long serialVersionUID = 1L;
 
     public static final TokenSerializer serializer = new TokenSerializer();
+    public static final MetadataSerializer metadataSerializer = new 
MetadataSerializer();
 
     public static abstract class TokenFactory
     {
@@ -90,6 +95,28 @@ public abstract class Token implements RingPosition<Token>, 
Serializable
         }
     }
 
+    public static class MetadataSerializer implements 
org.apache.cassandra.tcm.serialization.MetadataSerializer<Token>
+    {
+        private static final int SERDE_VERSION = MessagingService.VERSION_40;
+
+        public void serialize(Token t, DataOutputPlus out, Version version) 
throws IOException
+        {
+            serializer.serialize(t, out, SERDE_VERSION);
+        }
+
+        public Token deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            // This is only ever used to deserialize Tokens from this cluster 
and as the partitioner can
+            // never be changed, it's safe to assume that the right 
implementation is provided by ClusterMetadata
+            return serializer.deserialize(in, 
ClusterMetadata.current().partitioner, SERDE_VERSION);
+        }
+
+        public long serializedSize(Token t, Version version)
+        {
+            return serializer.serializedSize(t, SERDE_VERSION);
+        }
+    }
+
     public static class TokenSerializer implements 
IPartitionerDependentSerializer<Token>
     {
         public void serialize(Token token, DataOutputPlus out, int version) 
throws IOException
diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java 
b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
index 023d7ee2b4..98bac81400 100644
--- a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
@@ -22,10 +22,25 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
 
 public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, 
RangesAtEndpoint>
 {
+    public static RangesByEndpoint EMPTY = new 
RangesByEndpoint.Builder().build();
+
+    public static final Serializer serializer = new Serializer();
+
     public RangesByEndpoint(Map<InetAddressAndPort, RangesAtEndpoint> map)
     {
         super(map);
@@ -53,4 +68,58 @@ public class RangesByEndpoint extends 
ReplicaMultimap<InetAddressAndPort, Ranges
         }
     }
 
+    public static class Serializer implements 
MetadataSerializer<RangesByEndpoint>
+    {
+        public void serialize(RangesByEndpoint t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            Set<Map.Entry<InetAddressAndPort, RangesAtEndpoint>> entries = 
t.entrySet();
+            out.writeInt(entries.size());
+            for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : 
entries)
+            {
+                
InetAddressAndPort.MetadataSerializer.serializer.serialize(entry.getKey(), out, 
version);
+                AbstractReplicaCollection.ReplicaList replicas = 
entry.getValue().list;
+                out.writeInt(replicas.size());
+                for (Replica r : replicas)
+                {
+                    IPartitioner.validate(r.range());
+                    Range.serializer.serialize(r.range(), out, version);
+                    out.writeBoolean(r.isFull());
+                }
+            }
+        }
+
+        public RangesByEndpoint deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            RangesByEndpoint.Builder builder = new Builder();
+            int size = in.readInt();
+            for (int i=0; i<size; i++)
+            {
+                InetAddressAndPort endpoint = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+                int replicas = in.readInt();
+                for (int j=0; j<replicas; j++)
+                {
+                    Range<Token> range = Range.serializer.deserialize(in, 
version);
+                    boolean full = in.readBoolean();
+                    builder.put(endpoint, new Replica(endpoint, range, full));
+                }
+            }
+            return builder.build();
+        }
+
+        public long serializedSize(RangesByEndpoint t, Version version)
+        {
+            long size = TypeSizes.INT_SIZE;
+            for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : 
t.entrySet())
+            {
+                size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(entry.getKey(), 
version);
+                size += TypeSizes.INT_SIZE;
+                for (Replica r : entry.getValue().list)
+                {
+                    size += Range.serializer.serializedSize(r.range(), 
version);
+                    size += TypeSizes.BOOL_SIZE;
+                }
+            }
+            return size;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java 
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
new file mode 100644
index 0000000000..7a55b67512
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class DataPlacement
+{
+    public static final Serializer serializer = new Serializer();
+
+    private static final DataPlacement EMPTY = new 
DataPlacement(PlacementForRange.EMPTY, PlacementForRange.EMPTY);
+
+    // TODO make tree of just EndpointsForRange, navigable by EFR.range()
+    // TODO combine peers into a single entity with one vote in any quorum
+    //      (e.g. old & new peer must both respond to count one replica)
+    public final PlacementForRange reads;
+    public final PlacementForRange writes;
+
+    public DataPlacement(PlacementForRange reads,
+                         PlacementForRange writes)
+    {
+        this.reads = reads;
+        this.writes = writes;
+    }
+
+    /**
+     * A union of read and write endpoints for range, for watermark purposes
+     */
+    public Set<InetAddressAndPort> affectedReplicas(Range<Token> range)
+    {
+        Set<InetAddressAndPort> endpoints = new HashSet<>();
+        for (Replica r : reads.matchRange(range))
+            endpoints.add(r.endpoint());
+        for (Replica r : writes.matchRange(range))
+            endpoints.add(r.endpoint());
+        return endpoints;
+    }
+
+    public DataPlacement combineReplicaGroups(DataPlacement other)
+    {
+        return new DataPlacement(PlacementForRange.builder()
+                                                  
.withReplicaGroups(reads.replicaGroups().values())
+                                                  
.withReplicaGroups(other.reads.replicaGroups.values())
+                                                  .build(),
+                                 PlacementForRange.builder()
+                                                  
.withReplicaGroups(writes.replicaGroups().values())
+                                                  
.withReplicaGroups(other.writes.replicaGroups.values())
+                                                  .build());
+    }
+
+    public PlacementDeltas.PlacementDelta difference(DataPlacement next)
+    {
+        return new PlacementDeltas.PlacementDelta(reads.difference(next.reads),
+                                                  
writes.difference(next.writes));
+    }
+
+    public DataPlacement mergeRangesForPlacement(Set<Token> leavingTokens)
+    {
+        return new 
DataPlacement(PlacementForRange.mergeRangesForPlacement(leavingTokens, reads),
+                                 
PlacementForRange.mergeRangesForPlacement(leavingTokens, writes));
+    }
+
+    public DataPlacement splitRangesForPlacement(List<Token> tokens)
+    {
+        return new 
DataPlacement(PlacementForRange.splitRangesForPlacement(tokens, reads),
+                                 
PlacementForRange.splitRangesForPlacement(tokens, writes));
+    }
+
+    public static DataPlacement empty()
+    {
+        return EMPTY;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder(PlacementForRange.builder(),
+                           PlacementForRange.builder());
+    }
+
+    public Builder unbuild()
+    {
+        return new Builder(reads.unbuild(), writes.unbuild());
+    }
+    public static class Builder
+    {
+        public final PlacementForRange.Builder reads;
+        public final PlacementForRange.Builder writes;
+
+        public Builder(PlacementForRange.Builder reads, 
PlacementForRange.Builder writes)
+        {
+            this.reads = reads;
+            this.writes = writes;
+        }
+
+        public Builder withWriteReplica(Replica replica)
+        {
+            this.writes.withReplica(replica);
+            return this;
+        }
+
+        public Builder withoutWriteReplica(Replica replica)
+        {
+            this.writes.withoutReplica(replica);
+            return this;
+        }
+
+        public Builder withReadReplica(Replica replica)
+        {
+            this.reads.withReplica(replica);
+            return this;
+        }
+
+        public Builder withoutReadReplica(Replica replica)
+        {
+            this.reads.withoutReplica(replica);
+            return this;
+        }
+
+        public DataPlacement build()
+        {
+            return new DataPlacement(reads.build(), writes.build());
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "DataPlacement{" +
+               "reads=" + toString(reads.replicaGroups) +
+               ", writes=" + toString(writes.replicaGroups) +
+               '}';
+    }
+
+    public static String toString(Map<?, ?> predicted)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<?, ?> e : predicted.entrySet())
+        {
+            
sb.append(e.getKey()).append("=").append(e.getValue()).append(",\n");
+        }
+
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof DataPlacement)) return false;
+        DataPlacement that = (DataPlacement) o;
+        return Objects.equals(reads, that.reads) && Objects.equals(writes, 
that.writes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(reads, writes);
+    }
+
+    public static class Serializer implements MetadataSerializer<DataPlacement>
+    {
+        public void serialize(DataPlacement t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            PlacementForRange.serializer.serialize(t.reads, out, version);
+            PlacementForRange.serializer.serialize(t.writes, out, version);
+        }
+
+        public DataPlacement deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            PlacementForRange reads = 
PlacementForRange.serializer.deserialize(in, version);
+            PlacementForRange writes = 
PlacementForRange.serializer.deserialize(in, version);
+            return new DataPlacement(reads, writes);
+        }
+
+        public long serializedSize(DataPlacement t, Version version)
+        {
+            return PlacementForRange.serializer.serializedSize(t.reads, 
version) +
+                   PlacementForRange.serializer.serializedSize(t.writes, 
version);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java 
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
new file mode 100644
index 0000000000..3107a7d550
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.transformations.cms.EntireRange;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static 
org.apache.cassandra.tcm.transformations.cms.EntireRange.entireRange;
+
+public class DataPlacements extends ReplicationMap<DataPlacement> implements 
MetadataValue<DataPlacements>
+{
+    public static Serializer serializer = new Serializer();
+
+    public static final DataPlacements EMPTY = 
DataPlacements.builder(1).build();
+    private static DataPlacement LOCAL_PLACEMENT;
+
+    private final Epoch lastModified;
+
+    private DataPlacements(Epoch lastModified, Map<ReplicationParams, 
DataPlacement> map)
+    {
+        super(map);
+        this.lastModified = lastModified;
+    }
+
+    protected DataPlacement defaultValue()
+    {
+        return DataPlacement.empty();
+    }
+
+    public void withDistributed(BiConsumer<ReplicationParams, DataPlacement> 
consumer)
+    {
+        forEach(e -> {
+            if (e.getKey().isLocal() || e.getKey().isMeta())
+                return;
+
+            consumer.accept(e.getKey(), e.getValue());
+        });
+    }
+
+    protected DataPlacement localOnly()
+    {
+        // it's unlikely to happen, but perfectly safe to create multiple 
times, so no need to lock or statically init
+        if (null == LOCAL_PLACEMENT)
+        {
+            PlacementForRange placement = new 
PlacementForRange(Collections.singletonMap(entireRange,
+                                                                               
          
EndpointsForRange.of(EntireRange.replica(FBUtilities.getBroadcastAddressAndPort()))));
+            LOCAL_PLACEMENT = new DataPlacement(placement, placement);
+        }
+        return LOCAL_PLACEMENT;
+    }
+
+    public DataPlacements combineReplicaGroups(DataPlacements end)
+    {
+        DataPlacements start = this;
+        if (start.isEmpty())
+            return end;
+        Builder mapBuilder = DataPlacements.builder(start.size());
+        start.asMap().forEach((params, placement) ->
+                              mapBuilder.with(params, 
placement.combineReplicaGroups(end.get(params))));
+        return mapBuilder.build();
+    }
+
+    @Override
+    public DataPlacements withLastModified(Epoch epoch)
+    {
+        return new DataPlacements(epoch, asMap());
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "DataPlacements{" +
+               "lastModified=" + lastModified +
+               ", placementMap=" + asMap() +
+               '}';
+    }
+
+    public static DataPlacements sortReplicaGroups(DataPlacements placements, 
Comparator<Replica> comparator)
+    {
+        Builder builder = DataPlacements.builder(placements.size());
+        placements.asMap().forEach((params, placement) -> {
+            PlacementForRange.Builder reads = 
PlacementForRange.builder(placement.reads.replicaGroups().size());
+            placement.reads.replicaGroups().forEach((range, endpoints) -> {
+                reads.withReplicaGroup(endpoints.sorted(comparator));
+            });
+            PlacementForRange.Builder writes = 
PlacementForRange.builder(placement.writes.replicaGroups().size());
+            placement.writes.replicaGroups().forEach((range, endpoints) -> {
+                writes.withReplicaGroup(endpoints.sorted(comparator));
+            });
+            builder.with(params, new DataPlacement(reads.build(), 
writes.build()));
+        });
+        return builder.build();
+    }
+
+    public static DataPlacements empty()
+    {
+        return EMPTY;
+    }
+
+    public static Builder builder(int expectedSize)
+    {
+        return new Builder(new HashMap<>(expectedSize));
+    }
+
+    public static Builder builder(Map<ReplicationParams, DataPlacement> map)
+    {
+        return new Builder(map);
+    }
+
+    public Builder unbuild()
+    {
+        return new Builder(new HashMap<>(this.asMap()));
+    }
+
+    public static class Builder
+    {
+        private final Map<ReplicationParams, DataPlacement> map;
+        private Builder(Map<ReplicationParams, DataPlacement> map)
+        {
+            this.map = map;
+        }
+
+        public Builder with(ReplicationParams params, DataPlacement placement)
+        {
+            map.put(params, placement);
+            return this;
+        }
+
+        public DataPlacements build()
+        {
+            return new DataPlacements(Epoch.EMPTY, map);
+        }
+    }
+
+    public static class Serializer implements 
MetadataSerializer<DataPlacements>
+    {
+        public void serialize(DataPlacements t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            Map<ReplicationParams, DataPlacement> map = t.asMap();
+            out.writeInt(map.size());
+            for (Map.Entry<ReplicationParams, DataPlacement> entry : 
map.entrySet())
+            {
+                ReplicationParams.serializer.serialize(entry.getKey(), out, 
version);
+                DataPlacement.serializer.serialize(entry.getValue(), out, 
version);
+            }
+            Epoch.serializer.serialize(t.lastModified, out, version);
+        }
+
+        public DataPlacements deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            int size = in.readInt();
+            Map<ReplicationParams, DataPlacement> map = 
Maps.newHashMapWithExpectedSize(size);
+            for (int i = 0; i < size; i++)
+            {
+                ReplicationParams params = 
ReplicationParams.serializer.deserialize(in, version);
+                DataPlacement dp = DataPlacement.serializer.deserialize(in, 
version);
+                map.put(params, dp);
+            }
+            Epoch lastModified = Epoch.serializer.deserialize(in, version);
+            return new DataPlacements(lastModified, map);
+        }
+
+        public long serializedSize(DataPlacements t, Version version)
+        {
+            int size = sizeof(t.size());
+            for (Map.Entry<ReplicationParams, DataPlacement> entry : 
t.asMap().entrySet())
+            {
+                size += 
ReplicationParams.serializer.serializedSize(entry.getKey(), version);
+                size += 
DataPlacement.serializer.serializedSize(entry.getValue(), version);
+            }
+            size += Epoch.serializer.serializedSize(t.lastModified, version);
+            return size;
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/Delta.java 
b/src/java/org/apache/cassandra/tcm/ownership/Delta.java
new file mode 100644
index 0000000000..a0fe3ba2d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/Delta.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class Delta
+{
+    public static final Serializer serializer = new Serializer();
+
+    private static final Delta EMPTY = new Delta(RangesByEndpoint.EMPTY, 
RangesByEndpoint.EMPTY);
+
+    public final RangesByEndpoint removals;
+    public final RangesByEndpoint additions;
+
+    public Delta(RangesByEndpoint removals, RangesByEndpoint additions)
+    {
+        this.removals = removals;
+        this.additions = additions;
+    }
+
+    public Delta onlyAdditions()
+    {
+        return new Delta(RangesByEndpoint.EMPTY, additions);
+    }
+
+    public Delta onlyRemovals()
+    {
+        return new Delta(removals, RangesByEndpoint.EMPTY);
+    }
+
+    public Delta merge(Delta other)
+    {
+        return new Delta(merge(removals, other.removals),
+                         merge(additions, other.additions));
+    }
+
+    public Delta invert()
+    {
+        return new Delta(additions, removals);
+    }
+
+    public static RangesByEndpoint merge(RangesByEndpoint...byEndpoints)
+    {
+        RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
+        for (RangesByEndpoint rbe : byEndpoints)
+        {
+            rbe.asMap()
+               .forEach((endpoint, replicas) -> replicas.forEach(replica -> 
builder.put(endpoint, replica)));
+        }
+        return builder.build();
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Delta delta = (Delta) o;
+
+        return Objects.equals(removals, delta.removals) && 
Objects.equals(additions, delta.additions);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(removals, additions);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Delta{" +
+               "removals=" + removals +
+               ", additions=" + additions +
+               '}';
+    }
+
+    public static Delta empty()
+    {
+        return EMPTY;
+    }
+
+    public static final class Serializer implements MetadataSerializer<Delta>
+    {
+        public void serialize(Delta t, DataOutputPlus out, Version version) 
throws IOException
+        {
+            RangesByEndpoint.serializer.serialize(t.removals, out, version);
+            RangesByEndpoint.serializer.serialize(t.additions, out, version);
+        }
+
+        public Delta deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            return new Delta(RangesByEndpoint.serializer.deserialize(in, 
version),
+                             RangesByEndpoint.serializer.deserialize(in, 
version));
+        }
+
+        public long serializedSize(Delta t, Version version)
+        {
+            return RangesByEndpoint.serializer.serializedSize(t.removals, 
version) +
+                   RangesByEndpoint.serializer.serializedSize(t.additions, 
version);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java 
b/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java
new file mode 100644
index 0000000000..a2ec8a2074
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+//import java.io.IOException;
+//import java.util.*;
+//
+//import org.apache.cassandra.db.TypeSizes;
+//import org.apache.cassandra.io.IVersionedSerializer;
+//import org.apache.cassandra.io.util.DataInputPlus;
+//import org.apache.cassandra.io.util.DataOutputPlus;
+//import org.apache.cassandra.schema.ReplicationParams;
+//
+//public class GlobalPlacementDelta
+//{
+//    public static final Serializer serializer = new Serializer();
+//    public final PlacementDeltas readDeltas;
+//    public final PlacementDeltas writeDeltas;
+//
+//    private GlobalPlacementDelta(PlacementDeltas readDeltas, PlacementDeltas 
writeDeltas)
+//    {
+//        this.readDeltas = readDeltas;
+//        this.writeDeltas = writeDeltas;
+//    }
+//
+//    public DataPlacements apply(DataPlacements placements)
+//    {
+//        return apply(placements, readDeltas)
+//        return DataPlacements.builder()
+//                             .reads(apply(placements.reads, readDeltas))
+//                             .writes(apply(placements.writes, writeDeltas))
+//                             .build();
+//    }
+//
+//    public static DataPlacements apply(DataPlacements before, 
PlacementDeltas delta)
+//    {
+//        DataPlacements.Builder mapBuilder = 
DataPlacements.builder(before.size());
+//        before.asMap().forEach((params, placement) -> {
+//            PlacementDeltas.PlacementDelta d = delta.get(params);
+//            if (d == null)
+//                mapBuilder.put(params, placement);
+//            else
+//                mapBuilder.put(params, d.apply(placement));
+//        });
+//        return mapBuilder.build();
+//    }
+//
+//    public boolean equals(Object o)
+//    {
+//        if (this == o) return true;
+//        if (o == null || getClass() != o.getClass()) return false;
+//        GlobalPlacementDelta that = (GlobalPlacementDelta) o;
+//        return Objects.equals(readDeltas, that.readDeltas) && 
Objects.equals(writeDeltas, that.writeDeltas);
+//    }
+//
+//    public int hashCode()
+//    {
+//        return Objects.hash(readDeltas, writeDeltas);
+//    }
+//
+//    public String toString()
+//    {
+//        return "PlacementDelta{" +
+//               "reads=" + readDeltas +
+//               ", writes=" + writeDeltas +
+//               '}';
+//    }
+//
+//    public static Builder builder()
+//    {
+//        return new Builder();
+//    }
+//
+//    public static final class Builder
+//    {
+//        PlacementDeltas.Builder reads = PlacementDeltas.builder();
+//        PlacementDeltas.Builder writes = PlacementDeltas.builder();
+//
+//        public Builder withReadDelta(ReplicationParams params, Delta delta)
+//        {
+//            reads.put(params, delta);
+//            return this;
+//        }
+//
+//        public Builder withWriteDelta(ReplicationParams params, Delta delta)
+//        {
+//            writes.put(params, delta);
+//            return this;
+//        }
+//
+//        public GlobalPlacementDelta build()
+//        {
+//            return new GlobalPlacementDelta(reads.build(), writes.build());
+//        }
+//    }
+//
+//    public static class Serializer implements 
IVersionedSerializer<GlobalPlacementDelta>
+//    {
+//
+//        public void serialize(GlobalPlacementDelta t, DataOutputPlus out, 
int version) throws IOException
+//        {
+//            out.writeInt(t.readDeltas.size());
+//            for (Map.Entry<ReplicationParams, Delta> e : t.readDeltas)
+//            {
+//                ReplicationParams.serializer.serialize(e.getKey(), out, 
version);
+//                Delta.serializer.serialize(e.getValue(), out, version);
+//            }
+//            out.writeInt(t.writeDeltas.size());
+//            for (Map.Entry<ReplicationParams, Delta> e : t.writeDeltas)
+//            {
+//                ReplicationParams.serializer.serialize(e.getKey(), out, 
version);
+//                Delta.serializer.serialize(e.getValue(), out, version);
+//            }
+//        }
+//
+//        public GlobalPlacementDelta deserialize(DataInputPlus in, int 
version) throws IOException
+//        {
+//            int size = in.readInt();
+//            PlacementDeltas.Builder reads = PlacementDeltas.builder(size);
+//            for (int i = 0; i < size; i++)
+//                reads.put(ReplicationParams.serializer.deserialize(in, 
version), Delta.serializer.deserialize(in, version));
+//            size = in.readInt();
+//            PlacementDeltas.Builder writes = PlacementDeltas.builder(size);
+//            for (int i = 0; i < size; i++)
+//                writes.put(ReplicationParams.serializer.deserialize(in, 
version), Delta.serializer.deserialize(in, version));
+//
+//            return new GlobalPlacementDelta(reads.build(), writes.build());
+//        }
+//
+//        public long serializedSize(GlobalPlacementDelta t, int version)
+//        {
+//            long size = TypeSizes.INT_SIZE;
+//            for (Map.Entry<ReplicationParams, Delta> e : t.readDeltas)
+//            {
+//                size += 
ReplicationParams.serializer.serializedSize(e.getKey(), version);
+//                size += Delta.serializer.serializedSize(e.getValue(), 
version);
+//            }
+//            size += TypeSizes.INT_SIZE;
+//            for (Map.Entry<ReplicationParams, Delta> e : t.writeDeltas)
+//            {
+//                size += 
ReplicationParams.serializer.serializedSize(e.getKey(), version);
+//                size += Delta.serializer.serializedSize(e.getValue(), 
version);
+//            }
+//            return size;
+//        }
+//    }
+//}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java 
b/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java
new file mode 100644
index 0000000000..0d2e15b2c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.cassandra.locator.EndpointsByReplica;
+import org.apache.cassandra.schema.ReplicationParams;
+
+public class MovementMap extends ReplicationMap<EndpointsByReplica>
+{
+    private static final MovementMap EMPTY = new 
MovementMap(Collections.emptyMap());
+
+    private MovementMap(Map<ReplicationParams, EndpointsByReplica> map)
+    {
+        super(map);
+    }
+
+    protected EndpointsByReplica defaultValue()
+    {
+        return new EndpointsByReplica.Builder().build();
+    }
+
+    protected EndpointsByReplica localOnly()
+    {
+        throw new IllegalStateException("Cannot move local ranges");
+    }
+
+    public String toString()
+    {
+        return "MovementMap{" +
+               "map=" + asMap() +
+               '}';
+    }
+
+    public static MovementMap empty()
+    {
+        return EMPTY;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder(new HashMap<>());
+    }
+
+    public static Builder builder(int expectedSize)
+    {
+        return new Builder(new HashMap<>(expectedSize));
+    }
+
+    public static class Builder
+    {
+        private final Map<ReplicationParams, EndpointsByReplica> map;
+        private Builder(Map<ReplicationParams, EndpointsByReplica> map)
+        {
+            this.map = map;
+        }
+
+        public Builder put(ReplicationParams params, EndpointsByReplica 
placement)
+        {
+            map.put(params, placement);
+            return this;
+        }
+
+        public MovementMap build()
+        {
+            return new MovementMap(map);
+        }
+    }
+
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java 
b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
new file mode 100644
index 0000000000..b54806834d
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class PlacementDeltas extends 
ReplicationMap<PlacementDeltas.PlacementDelta>
+{
+    public static final Serializer serializer = new Serializer();
+    private static final PlacementDeltas EMPTY = new 
PlacementDeltas(Collections.emptyMap());
+
+    private PlacementDeltas(Map<ReplicationParams, PlacementDelta> map)
+    {
+        super(map);
+    }
+
+    protected PlacementDelta defaultValue()
+    {
+        return PlacementDelta.EMPTY;
+    }
+
+    protected PlacementDelta localOnly()
+    {
+        throw new IllegalStateException("Cannot apply diff to local 
placements.");
+    }
+
+    public PlacementDeltas invert()
+    {
+        Builder inverse = builder(size());
+        asMap().forEach((params, delta) -> inverse.put(params, 
delta.invert()));
+        return inverse.build();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "DeltaMap{" +
+               "map=" + asMap() +
+               '}';
+    }
+
+    public DataPlacements apply(DataPlacements placements)
+    {
+        DataPlacements.Builder builder = placements.unbuild();
+        asMap().forEach((params, delta) -> {
+            DataPlacement previous = placements.get(params);
+            builder.with(params, delta.apply(previous));
+        });
+        return builder.build();
+    }
+
+    public static PlacementDeltas empty()
+    {
+        return EMPTY;
+    }
+
+    public static Builder builder()
+    {
+        return new Builder(new HashMap<>());
+    }
+
+    public static Builder builder(int expectedSize)
+    {
+        return new Builder(new HashMap<>(expectedSize));
+    }
+
+    public static Builder builder(Map<ReplicationParams, PlacementDelta> map)
+    {
+        return new Builder(map);
+    }
+
+    public static class PlacementDelta
+    {
+        public static PlacementDelta EMPTY = new PlacementDelta(Delta.empty(), 
Delta.empty());
+
+        public final Delta reads;
+        public final Delta writes;
+
+        public PlacementDelta(Delta reads, Delta writes)
+        {
+            this.reads = reads;
+            this.writes = writes;
+        }
+
+        public PlacementDelta onlyReads()
+        {
+            return new PlacementDelta(reads, Delta.empty());
+        }
+
+        public PlacementDelta onlyWrites()
+        {
+            return new PlacementDelta(Delta.empty(), writes);
+        }
+
+        public PlacementDelta onlyAdditions()
+        {
+            return new PlacementDelta(reads.onlyAdditions(), 
writes.onlyAdditions());
+        }
+
+        public PlacementDelta onlyRemovals()
+        {
+            return new PlacementDelta(reads.onlyRemovals(), 
writes.onlyRemovals());
+        }
+
+        public DataPlacement apply(DataPlacement placement)
+        {
+            DataPlacement.Builder builder = placement.unbuild();
+            
reads.additions.flattenValues().forEach(builder.reads::withReplica);
+            
writes.additions.flattenValues().forEach(builder.writes::withReplica);
+
+            
reads.removals.flattenValues().forEach(builder.reads::withoutReplica);
+            
writes.removals.flattenValues().forEach(builder.writes::withoutReplica);
+            return builder.build();
+        }
+
+        public PlacementDelta merge(PlacementDelta other)
+        {
+            return new PlacementDelta(reads.merge(other.reads),
+                                      writes.merge(other.writes));
+        }
+
+        public PlacementDelta invert()
+        {
+            return new PlacementDelta(reads.invert(), writes.invert());
+        }
+
+        public String toString()
+        {
+            return "PlacementDelta{" +
+                   "reads=" + reads +
+                   ", writes=" + writes +
+                   '}';
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            PlacementDelta other = (PlacementDelta) o;
+
+            return Objects.equals(reads, other.reads) && 
Objects.equals(writes, other.writes);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(reads, writes);
+        }
+    }
+
+    public static class Builder
+    {
+        private final Map<ReplicationParams, PlacementDelta> map;
+        private Builder(Map<ReplicationParams, PlacementDelta> map)
+        {
+            this.map = map;
+        }
+
+        public Builder put(ReplicationParams params, PlacementDelta placement)
+        {
+            PlacementDelta delta = map.get(params);
+            if (delta == null)
+                map.put(params, placement);
+            else
+                map.put(params, delta.merge(placement));
+            return this;
+        }
+
+        public PlacementDeltas build()
+        {
+            return new PlacementDeltas(map);
+        }
+    }
+
+    public static class Serializer implements 
MetadataSerializer<PlacementDeltas>
+    {
+        public void serialize(PlacementDeltas t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            out.writeInt(t.size());
+            for (Map.Entry<ReplicationParams, PlacementDelta> e : 
t.asMap().entrySet())
+            {
+                ReplicationParams.serializer.serialize(e.getKey(), out, 
version);
+                Delta.serializer.serialize(e.getValue().reads, out, version);
+                Delta.serializer.serialize(e.getValue().writes, out, version);
+            }
+
+        }
+
+        public PlacementDeltas deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            int size = in.readInt();
+            Builder builder = PlacementDeltas.builder(size);
+            for (int i = 0; i < size; i++)
+            {
+                ReplicationParams replicationParams = 
ReplicationParams.serializer.deserialize(in, version);
+                Delta reads = Delta.serializer.deserialize(in, version);
+                Delta writes = Delta.serializer.deserialize(in, version);
+                builder.put(replicationParams, new PlacementDelta(reads, 
writes));
+            }
+            return builder.build();
+        }
+
+        public long serializedSize(PlacementDeltas t, Version version)
+        {
+            long size = TypeSizes.INT_SIZE;
+            for (Map.Entry<ReplicationParams, PlacementDelta> e : 
t.asMap().entrySet())
+            {
+                size += 
ReplicationParams.serializer.serializedSize(e.getKey(), version);
+                size += Delta.serializer.serializedSize(e.getValue().reads, 
version);
+                size += Delta.serializer.serializedSize(e.getValue().writes, 
version);
+            }
+            return size;
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java 
b/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java
new file mode 100644
index 0000000000..aa5f90020c
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.AbstractReplicaCollection;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.RangesAtEndpoint;
+import org.apache.cassandra.locator.RangesByEndpoint;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+
+public class PlacementForRange
+{
+    public static final Serializer serializer = new Serializer();
+
+    public static final PlacementForRange EMPTY = 
PlacementForRange.builder().build();
+
+    final SortedMap<Range<Token>, EndpointsForRange> replicaGroups;
+
+    public PlacementForRange(Map<Range<Token>, EndpointsForRange> 
replicaGroups)
+    {
+        this.replicaGroups = new TreeMap<>(replicaGroups);
+    }
+
+    @VisibleForTesting
+    public Map<Range<Token>, EndpointsForRange> replicaGroups()
+    {
+        return Collections.unmodifiableMap(replicaGroups);
+    }
+
+    @VisibleForTesting
+    public List<Range<Token>> ranges()
+    {
+        List<Range<Token>> ranges = new ArrayList<>(replicaGroups.keySet());
+        ranges.sort(Range::compareTo);
+        return ranges;
+    }
+
+    @VisibleForTesting
+    public EndpointsForRange forRange(Range<Token> range) // TODO: rename to 
`forRangeRead`? e
+    {
+        // can't use range.isWrapAround() since range.unwrap() returns a 
wrapping range (right token is min value)
+        assert range.right.compareTo(range.left) > 0 || 
range.right.equals(range.right.minValue());
+        return replicaGroups.get(range);
+    }
+
+    /**
+     * This method is intended to be used on read/write path, not forRange.
+     */
+    public EndpointsForRange matchRange(Range<Token> range)
+    {
+        EndpointsForRange.Builder builder = new 
EndpointsForRange.Builder(range);
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
replicaGroups.entrySet())
+        {
+            if (entry.getKey().contains(range))
+                builder.addAll(entry.getValue(), 
ReplicaCollection.Builder.Conflict.ALL);
+        }
+        return builder.build();
+    }
+
+    public EndpointsForRange forRange(Token token)
+    {
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
replicaGroups.entrySet())
+        {
+            if (entry.getKey().contains(token))
+                return entry.getValue();
+        }
+        throw new IllegalStateException("Could not find range for token " + 
token + " in PlacementForRange: " + replicaGroups);
+    }
+
+    // TODO: this could be improved by searching it rather than iterating over 
it
+    public EndpointsForToken forToken(Token token)
+    {
+        EndpointsForToken.Builder builder = new 
EndpointsForToken.Builder(token);
+        for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
replicaGroups.entrySet())
+        {
+            if (entry.getKey().contains(token))
+                builder.addAll(entry.getValue().forToken(token), 
ReplicaCollection.Builder.Conflict.ALL);
+        }
+        return builder.build();
+    }
+
+    public Delta difference(PlacementForRange next)
+    {
+        RangesByEndpoint oldMap = this.byEndpoint();
+        RangesByEndpoint newMap = next.byEndpoint();
+        return new Delta(diff(oldMap, newMap), diff(newMap, oldMap));
+    }
+
+    @VisibleForTesting
+    public RangesByEndpoint byEndpoint()
+    {
+        RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
+        for (Map.Entry<Range<Token>, EndpointsForRange> oldPlacement : 
this.replicaGroups.entrySet())
+            oldPlacement.getValue().byEndpoint().forEach(builder::put);
+        return builder.build();
+    }
+
+    // TODO do this without using a builder to collate and without using 
subtractSameReplication
+    // i.e. by directly removing the replicas so that we don't need to care 
about the ordering of with/without.
+    public PlacementForRange without(RangesByEndpoint toRemove)
+    {
+        Builder builder = builder();
+        RangesByEndpoint currentByEndpoint = this.byEndpoint();
+        for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> endPointRanges : 
currentByEndpoint.entrySet())
+        {
+            InetAddressAndPort endpoint = endPointRanges.getKey();
+            RangesAtEndpoint currentRanges = endPointRanges.getValue();
+            RangesAtEndpoint removeRanges = toRemove.get(endpoint);
+            for (Replica oldReplica : currentRanges)
+            {
+                RangesAtEndpoint toRetain = 
oldReplica.subtractSameReplication(removeRanges);
+                toRetain.forEach(builder::withReplica);
+            }
+        }
+        return builder.build();
+    }
+
+    // TODO do this without using a builder, i.e. by directly adding the 
replicas
+    // (and directly removing them in without) so that we don't need to care
+    // about the ordering of with/without.
+    public PlacementForRange with(RangesByEndpoint toAdd)
+    {
+        Builder builder = builder();
+        for (EndpointsForRange current : replicaGroups.values())
+            builder.withReplicaGroup(current);
+        for (Replica newReplica : toAdd.flattenValues())
+            builder.withReplica(newReplica);
+        return builder.build();
+    }
+
+    private RangesByEndpoint diff(RangesByEndpoint left, RangesByEndpoint 
right)
+    {
+        RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder();
+        for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> endPointRanges : 
left.entrySet())
+        {
+            InetAddressAndPort endpoint = endPointRanges.getKey();
+            RangesAtEndpoint leftRanges = endPointRanges.getValue();
+            RangesAtEndpoint rightRanges = right.get(endpoint);
+            for (Replica leftReplica : leftRanges)
+            {
+                if (!rightRanges.contains(leftReplica))
+                    builder.put(endpoint, leftReplica);
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public String toString()
+    {
+        return replicaGroups.toString();
+    }
+
+    @VisibleForTesting
+    public Map<String, List<String>> toStringByEndpoint()
+    {
+        Map<String, List<String>> mappings = new HashMap<>();
+        for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : 
byEndpoint().entrySet())
+            mappings.put(entry.getKey().toString(), entry.getValue().asList(r 
-> r.range().toString()));
+        return mappings;
+    }
+
+    @VisibleForTesting
+    public List<String> toReplicaStringList()
+    {
+        return replicaGroups.values()
+                            .stream()
+                            .flatMap(AbstractReplicaCollection::stream)
+                            .map(Replica::toString)
+                            .collect(Collectors.toList());
+    }
+
+    public Builder unbuild()
+    {
+        return new Builder(new HashMap<>(replicaGroups));
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static Builder builder(int expectedSize)
+    {
+        return new Builder(expectedSize);
+    }
+
+    @VisibleForTesting
+    public static PlacementForRange mergeRangesForPlacement(Set<Token> 
leavingTokens, PlacementForRange placement)
+    {
+        if (placement.replicaGroups.isEmpty())
+            return placement;
+
+        Builder newPlacement = PlacementForRange.builder();
+        List<Map.Entry<Range<Token>, EndpointsForRange>> sortedEntries = new 
ArrayList<>(placement.replicaGroups.entrySet());
+        sortedEntries.sort(Comparator.comparing(e -> e.getKey().left));
+        Iterator<Map.Entry<Range<Token>, EndpointsForRange>> iter = 
sortedEntries.iterator();
+        EndpointsForRange stashedReplicas = null;
+
+        // TODO verify with transient replication. For example, if we have 
ranges [0,100] -> A, B and
+        // [100,200] -> Atransient, B, we would currently not be able to merge 
those as A != Atransient
+        while (iter.hasNext())
+        {
+            Map.Entry<Range<Token>, EndpointsForRange> entry = iter.next();
+            Range<Token> currentRange = entry.getKey();
+            EndpointsForRange currentReplicas = entry.getValue();
+            // current range ends with one of the tokens being removed so we 
will want to merge it with the neighbouring
+            // range, potentially with a number of subsequent ranges if there 
is an unbroken sequence of leaving tokens.
+            if (leavingTokens.contains(currentRange.right))
+            {
+                // the previous range (if there was one) did not end in a 
leaving token, so the stash is empty
+                if (stashedReplicas == null)
+                    stashedReplicas = currentReplicas;
+                    // we already have a stashed replica group. Its range must 
be contiguous with this one so extend the
+                    // range, joining them. This asserts that the two sets of 
replicas match in terms of endpoints and
+                    // full/transient status too.
+                else
+                    stashedReplicas = mergeReplicaGroups(stashedReplicas, 
currentReplicas);
+            }
+            // current range does not end in a leaving token
+            else
+            {
+                // nothing stashed to merge, so add current replica group as 
it is
+                if (stashedReplicas == null)
+                    newPlacement.withReplicaGroup(currentReplicas);
+                else
+                {
+                    // we have stashed the preceding replica set, so merge it 
with this one, add it to the new
+                    // placement and clear the stash
+                    
newPlacement.withReplicaGroup(mergeReplicaGroups(stashedReplicas, 
currentReplicas));
+                    stashedReplicas = null;
+                }
+            }
+        }
+        if (null != stashedReplicas)
+            newPlacement.withReplicaGroup(stashedReplicas);
+
+        return newPlacement.build();
+    }
+
+    private static EndpointsForRange mergeReplicaGroups(EndpointsForRange 
left, EndpointsForRange right)
+    {
+        assert sameReplicas(left, right);
+        Range<Token> mergedRange = new Range<>(left.range().left, 
right.range().right);
+        return EndpointsForRange.builder(mergedRange, left.size())
+                                .addAll(left.asList(r -> new 
Replica(r.endpoint(), mergedRange, r.isFull())))
+                                .build();
+    }
+
+    private static boolean sameReplicas(EndpointsForRange left, 
EndpointsForRange right)
+    {
+        if (left.size() != right.size())
+            return false;
+
+        Comparator<Replica> comparator = 
Comparator.comparing(Replica::endpoint);
+        EndpointsForRange l = left.sorted(comparator);
+        EndpointsForRange r = right.sorted(comparator);
+        for (int i = 0; i < l.size(); i++)
+        {
+            Replica r1 = l.get(i);
+            Replica r2 = r.get(i);
+            if (!(r1.endpoint().equals(r2.endpoint())) || r1.isFull() != 
r2.isFull())
+                return false;
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    public static PlacementForRange splitRangesForPlacement(List<Token> 
tokens, PlacementForRange placement)
+    {
+        if (placement.replicaGroups.isEmpty())
+            return placement;
+
+        Builder newPlacement = PlacementForRange.builder();
+        List<EndpointsForRange> eprs = new 
ArrayList<>(placement.replicaGroups.values());
+        eprs.sort(Comparator.comparing(a -> a.range().left));
+        Token min = eprs.get(0).range().left;
+        Token max = eprs.get(eprs.size() - 1).range().right;
+
+        // if any token is < the start or > the end of the ranges covered, 
error
+        if (tokens.get(0).compareTo(min) < 0 || (!max.equals(min) && 
tokens.get(tokens.size()-1).compareTo(max) > 0))
+            throw new IllegalArgumentException("New tokens exceed total bounds 
of current placement ranges " + tokens + " " + eprs);
+        Iterator<EndpointsForRange> iter = eprs.iterator();
+        EndpointsForRange current = iter.next();
+        for (Token token : tokens)
+        {
+            // handle special case where one of the tokens is the min value
+            if (token.equals(min))
+                continue;
+
+            assert current != null : tokens + " " + eprs;
+            Range<Token> r = current.range();
+            int cmp = token.compareTo(r.right);
+            if (cmp == 0)
+            {
+                newPlacement.withReplicaGroup(current);
+                if (iter.hasNext())
+                    current = iter.next();
+                else
+                    current = null;
+            }
+            else if (cmp < 0 || r.right.isMinimum())
+            {
+                Range<Token> left = new Range<>(r.left, token);
+                Range<Token> right = new Range<>(token, r.right);
+                newPlacement.withReplicaGroup(EndpointsForRange.builder(left)
+                                                               
.addAll(current.asList(rep->rep.decorateSubrange(left)))
+                                                               .build());
+                current = EndpointsForRange.builder(right)
+                                           
.addAll(current.asList(rep->rep.decorateSubrange(right)))
+                                           .build();
+            }
+        }
+
+        if (current != null)
+            newPlacement.withReplicaGroup(current);
+
+        return newPlacement.build();
+    }
+
+    public static class Builder
+    {
+        private final Map<Range<Token>, EndpointsForRange> replicaGroups;
+
+        private Builder()
+        {
+            this(new HashMap<>());
+        }
+
+        private Builder(int expectedSize)
+        {
+            this(new HashMap<>(expectedSize));
+        }
+
+        private Builder(Map<Range<Token>, EndpointsForRange> replicaGroups)
+        {
+            this.replicaGroups = replicaGroups;
+        }
+
+        public Builder withReplica(Replica replica)
+        {
+            EndpointsForRange group =
+                replicaGroups.computeIfPresent(replica.range(),
+                                               (t, old) -> 
old.newBuilder(old.size() + 1)
+                                                              .addAll(old)
+                                                              .add(replica, 
ReplicaCollection.Builder.Conflict.ALL)
+                                                              .build());
+            if (group == null)
+                replicaGroups.put(replica.range(), 
EndpointsForRange.of(replica));
+            return this;
+
+        }
+
+        public Builder withoutReplica(Replica replica)
+        {
+            Range<Token> range = replica.range();
+            EndpointsForRange group = replicaGroups.get(range);
+            if (group == null)
+                throw new IllegalArgumentException(String.format("No group 
found for range of supplied replica %s (%s)",
+                                                                 replica, 
range));
+            EndpointsForRange without = 
group.without(Collections.singleton(replica.endpoint()));
+            if (without.isEmpty())
+                replicaGroups.remove(range);
+            else
+                replicaGroups.put(range, without);
+            return this;
+        }
+
+        public Builder withReplicaGroup(EndpointsForRange replicas)
+        {
+            EndpointsForRange group =
+                replicaGroups.computeIfPresent(replicas.range(),
+                                               (t, old) -> 
replicas.newBuilder(replicas.size() + old.size())
+                                                                   .addAll(old)
+                                                                   
.addAll(replicas, ReplicaCollection.Builder.Conflict.ALL)
+                                                                   .build());
+            if (group == null)
+                replicaGroups.put(replicas.range(), replicas);
+            return this;
+        }
+
+        public Builder withReplicaGroups(Iterable<EndpointsForRange> replicas)
+        {
+            replicas.forEach(this::withReplicaGroup);
+            return this;
+        }
+
+        public PlacementForRange build()
+        {
+            return new PlacementForRange(this.replicaGroups);
+        }
+    }
+
+    public static class Serializer implements 
MetadataSerializer<PlacementForRange>
+    {
+        public void serialize(PlacementForRange t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            out.writeInt(t.replicaGroups.size());
+
+            for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
t.replicaGroups.entrySet())
+            {
+                Range<Token> range = entry.getKey();
+                EndpointsForRange efr = entry.getValue();
+                Token.metadataSerializer.serialize(range.left, out, version);
+                Token.metadataSerializer.serialize(range.right, out, version);
+                out.writeInt(efr.size());
+                for (int i = 0; i < efr.size(); i++)
+                {
+                    Replica r = efr.get(i);
+                    Token.metadataSerializer.serialize(r.range().left, out, 
version);
+                    Token.metadataSerializer.serialize(r.range().right, out, 
version);
+                    
InetAddressAndPort.MetadataSerializer.serializer.serialize(r.endpoint(), out, 
version);
+                    out.writeBoolean(r.isFull());
+                }
+            }
+        }
+
+        public PlacementForRange deserialize(DataInputPlus in, Version 
version) throws IOException
+        {
+            int groupCount = in.readInt();
+            Map<Range<Token>, EndpointsForRange> result = 
Maps.newHashMapWithExpectedSize(groupCount);
+            for (int i = 0; i < groupCount; i++)
+            {
+                Range<Token> range = new 
Range<>(Token.metadataSerializer.deserialize(in, version),
+                                                 
Token.metadataSerializer.deserialize(in, version));
+                int replicaCount = in.readInt();
+                List<Replica> replicas = new ArrayList<>(replicaCount);
+                for (int x = 0; x < replicaCount; x++)
+                {
+                    Range<Token> replicaRange = new 
Range<>(Token.metadataSerializer.deserialize(in, version),
+                                                            
Token.metadataSerializer.deserialize(in, version));
+                    InetAddressAndPort replicaAddress = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+                    boolean isFull = in.readBoolean();
+                    replicas.add(new Replica(replicaAddress, replicaRange, 
isFull));
+
+                }
+                EndpointsForRange efr = EndpointsForRange.copyOf(replicas);
+                result.put(range, efr);
+            }
+            return new PlacementForRange(result);
+        }
+
+        public long serializedSize(PlacementForRange t, Version version)
+        {
+            int size = sizeof(t.replicaGroups.size());
+            for (Map.Entry<Range<Token>, EndpointsForRange> entry : 
t.replicaGroups.entrySet())
+            {
+                Range<Token> range = entry.getKey();
+                EndpointsForRange efr = entry.getValue();
+
+                size += Token.metadataSerializer.serializedSize(range.left, 
version);
+                size += Token.metadataSerializer.serializedSize(range.right, 
version);
+                size += sizeof(efr.size());
+                for (int i = 0; i < efr.size(); i++)
+                {
+                    Replica r = efr.get(i);
+                    size += 
Token.metadataSerializer.serializedSize(r.range().left, version);
+                    size += 
Token.metadataSerializer.serializedSize(r.range().right, version);
+                    size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(r.endpoint(), 
version);
+                    size += sizeof(r.isFull());
+                }
+            }
+            return size;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof PlacementForRange)) return false;
+        PlacementForRange that = (PlacementForRange) o;
+        return Objects.equals(replicaGroups, that.replicaGroups);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(replicaGroups);
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java 
b/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java
new file mode 100644
index 0000000000..8ab93c47c0
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+public interface PlacementProvider
+{
+    DataPlacements calculatePlacements(List<Range<Token>> ranges, 
ClusterMetadata metadata, Keyspaces keyspaces);
+    // TODO naming
+    PlacementTransitionPlan planForJoin(ClusterMetadata metadata,
+                                        NodeId joining,
+                                        Set<Token> tokens,
+                                        Keyspaces keyspaces);
+
+    PlacementTransitionPlan planForMove(ClusterMetadata metadata,
+                                        NodeId nodeId,
+                                        Set<Token> tokens,
+                                        Keyspaces keyspaces);
+
+    // TODO: maybe leave, for consistency?
+    PlacementTransitionPlan planForDecommission(ClusterMetadata metadata,
+                                                NodeId nodeId,
+                                                Keyspaces keyspaces);
+
+    PlacementTransitionPlan planForReplacement(ClusterMetadata metadata,
+                                               NodeId replaced,
+                                               NodeId replacement,
+                                               Keyspaces keyspaces);
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java 
b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
new file mode 100644
index 0000000000..edb22919dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+
+/**
+ *  A transition plan contains four elements:
+ *  - deltas to split original ranges, if necessary
+ *  - deltas to move each placement from start to maximal (i.e. 
overreplicated) state
+ *  - deltas to move each placement from maximal to final state
+ *  - deltas to merge final ranges, if necessary
+ *  These deltas describe the abstract changes necessary to transition between 
two one cluster states.
+ *  When the plan is compiled, these deltas are deconstructed and recomposed 
into the steps necessary for
+ *  safe execution of the given operation.
+ */
+public class PlacementTransitionPlan
+{
+    public final PlacementDeltas toSplit;
+    public final PlacementDeltas toMaximal;
+    public final PlacementDeltas toFinal;
+    public final PlacementDeltas toMerged;
+
+    private PlacementDeltas addToWrites;
+    private PlacementDeltas moveReads;
+    private PlacementDeltas removeFromWrites;
+    private LockedRanges.AffectedRanges affectedRanges;
+
+    public PlacementTransitionPlan(PlacementDeltas toSplit,
+                                   PlacementDeltas toMaximal,
+                                   PlacementDeltas toFinal,
+                                   PlacementDeltas toMerged)
+    {
+        this.toSplit = toSplit;
+        this.toMaximal = toMaximal;
+        this.toFinal = toFinal;
+        this.toMerged = toMerged;
+    }
+
+    public PlacementDeltas addToWrites()
+    {
+        if (addToWrites == null)
+            compile();
+        return addToWrites;
+    }
+    public PlacementDeltas moveReads()
+    {
+        if (moveReads == null)
+            compile();
+        return moveReads;
+    }
+
+    public PlacementDeltas removeFromWrites()
+    {
+        if (removeFromWrites == null)
+            compile();
+        return removeFromWrites;
+    }
+
+    public LockedRanges.AffectedRanges affectedRanges()
+    {
+        if (affectedRanges == null)
+            compile();
+        return affectedRanges;
+    }
+
+    private void compile()
+    {
+        PlacementDeltas.Builder addToWrites = PlacementDeltas.builder();
+        PlacementDeltas.Builder moveReads = PlacementDeltas.builder();
+        PlacementDeltas.Builder removeFromWrites = PlacementDeltas.builder();
+        LockedRanges.AffectedRangesBuilder affectedRanges = 
LockedRanges.AffectedRanges.builder();
+
+        toSplit.forEach((replication, delta) -> {
+            delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+        });
+
+        toMaximal.forEach((replication, delta) -> {
+            delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            addToWrites.put(replication, delta.onlyWrites());
+            moveReads.put(replication, delta.onlyReads());
+        });
+
+        toFinal.forEach((replication, delta) -> {
+            delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            moveReads.put(replication, delta.onlyReads());
+            removeFromWrites.put(replication, delta.onlyWrites());
+        });
+
+        toMerged.forEach((replication, delta) -> {
+            delta.reads.additions.flattenValues().forEach(r -> 
affectedRanges.add(replication, r.range()));
+            removeFromWrites.put(replication, delta);
+        });
+        this.addToWrites = addToWrites.build();
+        this.moveReads = moveReads.build();
+        this.removeFromWrites = removeFromWrites.build();
+        this.affectedRanges = affectedRanges.build();
+
+    }
+
+    @Override
+    public String toString()
+    {
+        return "PlacementTransitionPlan{" +
+               "toSplit=" + toSplit +
+               ", toMaximal=" + toMaximal +
+               ", toFinal=" + toFinal +
+               ", toMerged=" + toMerged +
+               ", compiled=" + (addToWrites == null) +
+               '}';
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java 
b/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java
new file mode 100644
index 0000000000..80e519f850
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.ownership;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.schema.ReplicationParams;
+
+public abstract class ReplicationMap<T> implements 
Iterable<Map.Entry<ReplicationParams, T>>
+{
+    private final Map<ReplicationParams, T> map;
+
+    protected ReplicationMap()
+    {
+        this(Collections.emptyMap());
+    }
+
+    protected ReplicationMap(Map<ReplicationParams, T> map)
+    {
+        this.map = map;
+    }
+
+    protected abstract T defaultValue();
+    protected abstract T localOnly();
+
+    public T get(ReplicationParams params)
+    {
+        if (params.isLocal())
+            return localOnly();
+        return map.getOrDefault(params, defaultValue());
+    }
+
+    public int size()
+    {
+        return map.size();
+    }
+
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    public void forEach(BiConsumer<ReplicationParams, T> consumer)
+    {
+        for (Map.Entry<ReplicationParams, T> entry : this)
+            consumer.accept(entry.getKey(), entry.getValue());
+    }
+
+    public ImmutableMap<ReplicationParams, T> asMap()
+    {
+        return ImmutableMap.copyOf(map);
+    }
+
+    public Set<ReplicationParams> keys()
+    {
+        return map.keySet();
+    }
+
+    public Iterator<Map.Entry<ReplicationParams, T>> iterator()
+    {
+        return map.entrySet().iterator();
+    }
+
+    public Stream<Map.Entry<ReplicationParams, T>> stream()
+    {
+        return StreamSupport.stream(spliterator(), false);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ReplicationMap<?> that = (ReplicationMap<?>) o;
+        return map.equals(that.map);
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(map);
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java 
b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
new file mode 100644
index 0000000000..a22f8d43f1
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.sequences;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+
+public class LockedRanges implements MetadataValue<LockedRanges>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final LockedRanges EMPTY = new LockedRanges(Epoch.EMPTY, 
ImmutableMap.<Key, AffectedRanges>builder().build());
+    public static final Key NOT_LOCKED = new Key(Epoch.EMPTY);
+    public final ImmutableMap<Key, AffectedRanges> locked;
+    private final Epoch lastModified;
+
+    private LockedRanges(Epoch lastModified, ImmutableMap<Key, AffectedRanges> 
locked)
+    {
+        this.lastModified = lastModified;
+        this.locked = locked;
+    }
+
+    public LockedRanges lock(Key key, AffectedRanges ranges)
+    {
+        assert !key.equals(NOT_LOCKED) : "Can't lock ranges with noop key";
+
+        if (ranges == AffectedRanges.EMPTY)
+            return this;
+
+        // TODO might we need the ability for the holder of a key to lock 
multiple sets over time?
+        return new LockedRanges(lastModified,
+                                ImmutableMap.<Key, 
AffectedRanges>builderWithExpectedSize(locked.size())
+                                            .putAll(locked)
+                                            .put(key, ranges)
+                                            .build());
+    }
+
+    public LockedRanges unlock(Key key)
+    {
+        if (key.equals(NOT_LOCKED))
+            return this;
+        ImmutableMap.Builder<Key, AffectedRanges> builder = 
ImmutableMap.builderWithExpectedSize(locked.size());
+        locked.forEach((k, r) -> {
+            if (!k.equals(key)) builder.put(k, r);
+        });
+        return new LockedRanges(lastModified, builder.build());
+    }
+
+    public Key intersects(AffectedRanges ranges)
+    {
+        for (Map.Entry<Key, AffectedRanges> e : locked.entrySet())
+        {
+            if (ranges.intersects(e.getValue()))
+                return e.getKey();
+        }
+        return NOT_LOCKED;
+    }
+
+    @Override
+    public LockedRanges withLastModified(Epoch epoch)
+    {
+        return new LockedRanges(epoch, locked);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "LockedRanges{" +
+               "lastModified=" + lastModified +
+               ", locked=" + locked +
+               '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof LockedRanges)) return false;
+        LockedRanges that = (LockedRanges) o;
+        return Objects.equals(lastModified, that.lastModified) && 
Objects.equals(locked, that.locked);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(lastModified, locked);
+    }
+
+    public static Key keyFor(Epoch epoch)
+    {
+        return new Key(epoch);
+    }
+
+    public interface AffectedRangesBuilder
+    {
+        AffectedRangesBuilder add(ReplicationParams params, Range<Token> 
range);
+        AffectedRanges build();
+    }
+
+    public interface AffectedRanges
+    {
+        AffectedRanges EMPTY = new AffectedRanges()
+        {
+            public boolean intersects(AffectedRanges other)
+            {
+                return false;
+            }
+
+            public void foreach(BiConsumer<ReplicationParams, 
Set<Range<Token>>> fn) {}
+
+            @Override
+            public String toString()
+            {
+                return "EMPTY";
+            }
+
+            public Map<ReplicationParams, Set<Range<Token>>> asMap()
+            {
+                return Collections.emptyMap();
+            }
+        };
+
+        default ImmutableSet<NodeId> toPeers(DataPlacements placements, 
Directory directory)
+        {
+            ImmutableSet.Builder<NodeId> peers = ImmutableSet.builder();
+            asMap().forEach((replication, rangeset) -> {
+                DataPlacement placement = placements.get(replication);
+                rangeset.stream()
+                        .flatMap(range -> 
placement.affectedReplicas(range).stream())
+                        .map(directory::peerId)
+                        .forEach(peers::add);
+            });
+            return peers.build();
+        }
+
+        static AffectedRanges singleton(ReplicationParams replicationParams, 
Range<Token> tokenRange)
+        {
+            return builder().add(replicationParams, tokenRange).build();
+        }
+
+        static AffectedRangesBuilder builder()
+        {
+            return new AffectedRangesImpl();
+        }
+
+        boolean intersects(AffectedRanges other);
+        void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn);
+        Map<ReplicationParams, Set<Range<Token>>> asMap();
+
+        final class Serializer implements MetadataSerializer<AffectedRanges>
+        {
+            public static final Serializer instance = new Serializer();
+
+            public void serialize(AffectedRanges t, DataOutputPlus out, 
Version version) throws IOException
+            {
+                Map<ReplicationParams, Set<Range<Token>>> map = t.asMap();
+                out.writeInt(map.size());
+                for (Map.Entry<ReplicationParams, Set<Range<Token>>> 
rangeEntry : map.entrySet())
+                {
+                    ReplicationParams params = rangeEntry.getKey();
+                    Set<Range<Token>> ranges = rangeEntry.getValue();
+                    ReplicationParams.serializer.serialize(params, out, 
version);
+                    out.writeInt(ranges.size());
+                    for (Range<Token> range : ranges)
+                    {
+                        Token.metadataSerializer.serialize(range.left, out, 
version);
+                        Token.metadataSerializer.serialize(range.right, out, 
version);
+                    }
+                }
+            }
+
+            public AffectedRanges deserialize(DataInputPlus in, Version 
version) throws IOException
+            {
+                int size = in.readInt();
+                Map<ReplicationParams, Set<Range<Token>>> map = 
Maps.newHashMapWithExpectedSize(size);
+                for (int x = 0; x < size; x++)
+                {
+                    ReplicationParams params = 
ReplicationParams.serializer.deserialize(in, version);
+                    int rangeSize = in.readInt();
+                    Set<Range<Token>> range = 
Sets.newHashSetWithExpectedSize(rangeSize);
+                    for (int y = 0; y < rangeSize; y++)
+                    {
+                        range.add(new 
Range<>(Token.metadataSerializer.deserialize(in, version),
+                                              
Token.metadataSerializer.deserialize(in, version)));
+                    }
+                    map.put(params, range);
+                }
+                return new AffectedRangesImpl(map);
+            }
+
+            public long serializedSize(AffectedRanges t, Version version)
+            {
+                Map<ReplicationParams, Set<Range<Token>>> map = t.asMap();
+                long size = sizeof(map.size());
+                for (Map.Entry<ReplicationParams, Set<Range<Token>>> 
rangeEntry : map.entrySet())
+                {
+                    ReplicationParams params = rangeEntry.getKey();
+                    Set<Range<Token>> ranges = rangeEntry.getValue();
+                    size += 
ReplicationParams.serializer.serializedSize(params, version);
+                    size += sizeof(ranges.size());
+                    for (Range<Token> range : ranges)
+                    {
+                        size += 
Token.metadataSerializer.serializedSize(range.left, version);
+                        size += 
Token.metadataSerializer.serializedSize(range.right, version);
+                    }
+                }
+                return size;
+            }
+        }
+    }
+
+    private static final class AffectedRangesImpl implements 
AffectedRangesBuilder, AffectedRanges
+    {
+        private final Map<ReplicationParams, Set<Range<Token>>> map;
+
+        public AffectedRangesImpl()
+        {
+            this(new HashMap<>());
+        }
+
+        public AffectedRangesImpl(Map<ReplicationParams, Set<Range<Token>>> 
map)
+        {
+            this.map = map;
+        }
+
+        @Override
+        public AffectedRangesBuilder add(ReplicationParams params, 
Range<Token> range)
+        {
+            Set<Range<Token>> ranges = map.get(params);
+            if (ranges == null)
+            {
+                ranges = new HashSet<>();
+                map.put(params, ranges);
+            }
+
+            ranges.add(range);
+            return this;
+        }
+
+        @Override
+        public Map<ReplicationParams, Set<Range<Token>>> asMap()
+        {
+            return map;
+        }
+
+        @Override
+        public AffectedRanges build()
+        {
+            return this;
+        }
+
+        @Override
+        public void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> 
fn)
+        {
+            map.forEach((k, v) -> fn.accept(k, 
Collections.unmodifiableSet(v)));
+        }
+
+        @Override
+        public boolean intersects(AffectedRanges other)
+        {
+            if (other == EMPTY)
+                return false;
+
+            for (Map.Entry<ReplicationParams, Set<Range<Token>>> e : 
((AffectedRangesImpl) other).map.entrySet())
+            {
+                for (Range<Token> otherRange : e.getValue())
+                {
+                    for (Range<Token> thisRange : map.get(e.getKey()))
+                    {
+                        if (thisRange.intersects(otherRange))
+                            return true;
+                    }
+                }
+            }
+
+            return false;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "AffectedRangesImpl{" +
+                   "map=" + map +
+                   '}';
+        }
+    }
+
+    public static class Key
+    {
+        public static final Serializer serializer = new Serializer();
+        private final Epoch epoch;
+
+        private Key(Epoch epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Key key1 = (Key) o;
+            return epoch.equals(key1.epoch);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(epoch);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Key{" +
+                   "key=" + epoch +
+                   '}';
+        }
+
+        public static final class Serializer
+        {
+            public void serialize(Key t, DataOutputPlus out, Version version) 
throws IOException
+            {
+                Epoch.serializer.serialize(t.epoch, out, version);
+            }
+
+            public Key deserialize(DataInputPlus in, Version version) throws 
IOException
+            {
+                return new Key(Epoch.serializer.deserialize(in, version));
+            }
+
+            public long serializedSize(Key t, Version version)
+            {
+                return Epoch.serializer.serializedSize(t.epoch, version);
+            }
+        }
+    }
+
+    public static class Serializer implements MetadataSerializer<LockedRanges>
+    {
+        public void serialize(LockedRanges t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            Epoch.serializer.serialize(t.lastModified, out, version);
+            out.writeInt(t.locked.size());
+            for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet())
+            {
+                Key key = entry.getKey();
+                Epoch.serializer.serialize(key.epoch, out, version);
+                AffectedRanges.Serializer.instance.serialize(entry.getValue(), 
out, version);
+            }
+        }
+
+        public LockedRanges deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            Epoch lastModified = Epoch.serializer.deserialize(in, version);
+            int size = in.readInt();
+            if (size == 0) return new LockedRanges(lastModified, 
ImmutableMap.of());
+            ImmutableMap.Builder<Key, AffectedRanges> result = 
ImmutableMap.builder();
+            for (int i = 0; i < size; i++)
+            {
+                Key key = new Key(Epoch.serializer.deserialize(in, version));
+                AffectedRanges ranges = 
AffectedRanges.Serializer.instance.deserialize(in, version);
+                result.put(key, ranges);
+            }
+            return new LockedRanges(lastModified, result.build());
+        }
+
+        public long serializedSize(LockedRanges t, Version version)
+        {
+            long size = Epoch.serializer.serializedSize(t.lastModified, 
version);
+            size += sizeof(t.locked.size());
+            for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet())
+            {
+                Key key = entry.getKey();
+                size += Epoch.serializer.serializedSize(key.epoch, version);
+                size += 
AffectedRanges.Serializer.instance.serializedSize(entry.getValue(), version);
+            }
+            return size;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to