This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 363b34c0fb5e4fd13b851ab10ed6059d3bb35964
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Wed Mar 1 10:53:51 2023 +0000

    [CEP-21] Add rudimentary cluster membership to TCM
    
    Adds a new Directory component to ClusterMetadata to manage member
    identity, state location and addressing. This duplicates some of the
    functions of TokenMetadata, Topology et al but with updates performed
    consistently via the global log. Although it isn't actually used for
    anything yet it is a prerequisite for managing data ownership through
    TCM, which will eventually replace TokenMetadata completely.
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../apache/cassandra/service/StorageService.java   |   3 +
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |  68 ++-
 .../org/apache/cassandra/tcm/MetadataKeys.java     |   1 +
 .../org/apache/cassandra/tcm/Transformation.java   |   2 +
 .../cassandra/tcm/compatibility/GossipHelper.java  |   2 +
 .../apache/cassandra/tcm/membership/Directory.java | 578 +++++++++++++++++++++
 .../apache/cassandra/tcm/membership/Location.java  |  83 +++
 .../cassandra/tcm/membership/NodeAddresses.java    | 125 +++++
 .../apache/cassandra/tcm/membership/NodeId.java    |  89 ++++
 .../NodeState.java}                                |  35 +-
 .../cassandra/tcm/membership/NodeVersion.java      | 118 +++++
 .../cassandra/tcm/transformations/Register.java    | 208 ++++++++
 .../apache/cassandra/utils/CassandraVersion.java   |   1 +
 .../apache/cassandra/utils/btree/BTreeBiMap.java   | 102 ++++
 .../cassandra/utils/btree/BTreeMultimap.java       | 214 ++++++++
 .../org/apache/cassandra/utils/btree/BTreeSet.java |  45 +-
 16 files changed, 1639 insertions(+), 35 deletions(-)

diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index ea1903bb0c..75557a4489 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -208,6 +208,8 @@ import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Startup;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.transport.ClientResourceLimits;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.Clock;
@@ -989,6 +991,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             throw new RuntimeException(e);
         }
 
+        NodeId self = Register.maybeRegister();
         completeInitialization();
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 4d9f5a28cf..90d5343d2c 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -38,6 +38,12 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.tcm.extensions.ExtensionKey;
 import org.apache.cassandra.tcm.extensions.ExtensionValue;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.cms.EntireRange;
@@ -58,16 +64,28 @@ public class ClusterMetadata
     public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
 
     public final DistributedSchema schema;
+    public final Directory directory;
     public final EndpointsForRange cmsReplicas;
     public final ImmutableSet<InetAddressAndPort> cmsMembers;
 
     public ClusterMetadata(IPartitioner partitioner)
+    {
+        this(partitioner, Directory.EMPTY);
+    }
+
+    private ClusterMetadata(IPartitioner partitioner, Directory directory)
+    {
+        this(partitioner, directory, DistributedSchema.first());
+    }
+
+    private ClusterMetadata(IPartitioner partitioner, Directory directory, 
DistributedSchema schema)
     {
         this(Epoch.EMPTY,
              Period.EMPTY,
              true,
              partitioner,
-             DistributedSchema.first(),
+             schema,
+             directory,
              ImmutableSet.of(),
              ImmutableMap.of());
     }
@@ -77,6 +95,7 @@ public class ClusterMetadata
                            boolean lastInPeriod,
                            IPartitioner partitioner,
                            DistributedSchema schema,
+                           Directory directory,
                            Set<InetAddressAndPort> cmsMembers,
                            Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
     {
@@ -85,6 +104,7 @@ public class ClusterMetadata
         this.lastInPeriod = lastInPeriod;
         this.partitioner = partitioner;
         this.schema = schema;
+        this.directory = directory;
         this.cmsMembers = ImmutableSet.copyOf(cmsMembers);
         this.extensions = ImmutableMap.copyOf(extensions);
 
@@ -122,6 +142,7 @@ public class ClusterMetadata
                                    lastInPeriod,
                                    partitioner,
                                    schema,
+                                   directory,
                                    cmsMembers,
                                    extensions);
     }
@@ -144,6 +165,7 @@ public class ClusterMetadata
         private final boolean lastInPeriod;
         private final IPartitioner partitioner;
         private DistributedSchema schema;
+        private Directory directory;
         private final Set<InetAddressAndPort> cmsMembers;
         private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
         private final Set<MetadataKey> modifiedKeys;
@@ -156,6 +178,7 @@ public class ClusterMetadata
             this.lastInPeriod = lastInPeriod;
             this.partitioner = metadata.partitioner;
             this.schema = metadata.schema;
+            this.directory = metadata.directory;
             this.cmsMembers = new HashSet<>(metadata.cmsMembers);
             extensions = new HashMap<>(metadata.extensions);
             modifiedKeys = new HashSet<>();
@@ -167,6 +190,18 @@ public class ClusterMetadata
             return this;
         }
 
+        public Transformer register(NodeAddresses addresses, Location 
location, NodeVersion version)
+        {
+            directory = directory.with(addresses, location, version);
+            return this;
+        }
+
+        public Transformer withNodeState(NodeId id, NodeState state)
+        {
+            directory = directory.withNodeState(id, state);
+            return this;
+        }
+
         public Transformer withCMSMember(InetAddressAndPort member)
         {
             cmsMembers.add(member);
@@ -229,11 +264,18 @@ public class ClusterMetadata
                 schema = schema.withLastModified(epoch);
             }
 
+            if (directory != base.directory)
+            {
+                modifiedKeys.add(MetadataKeys.NODE_DIRECTORY);
+                directory = directory.withLastModified(epoch);
+            }
+
             return new Transformed(new ClusterMetadata(epoch,
                                                        period,
                                                        lastInPeriod,
                                                        partitioner,
                                                        schema,
+                                                       directory,
                                                        cmsMembers,
                                                        extensions),
                                    ImmutableSet.copyOf(modifiedKeys));
@@ -248,6 +290,7 @@ public class ClusterMetadata
                    ", lastInPeriod=" + lastInPeriod +
                    ", partitioner=" + partitioner +
                    ", schema=" + schema +
+                   ", directory=" + schema +
                    ", extensions=" + extensions +
                    ", cmsMembers=" + cmsMembers +
                    ", modifiedKeys=" + modifiedKeys +
@@ -283,13 +326,15 @@ public class ClusterMetadata
         ClusterMetadata that = (ClusterMetadata) o;
         return epoch.equals(that.epoch) &&
                lastInPeriod == that.lastInPeriod &&
+               schema.equals(that.schema) &&
+               directory.equals(that.directory) &&
                extensions.equals(that.extensions);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(epoch, lastInPeriod, extensions);
+        return Objects.hash(epoch, lastInPeriod, schema, directory, 
extensions);
     }
 
     public static ClusterMetadata current()
@@ -309,6 +354,19 @@ public class ClusterMetadata
         return service.metadata();
     }
 
+    public NodeId myNodeId()
+    {
+        return directory.peerId(FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    public NodeState myNodeState()
+    {
+        NodeId nodeId = myNodeId();
+        if (myNodeId() != null)
+            return directory.peerState(nodeId);
+        return null;
+    }
+
     public static class Serializer implements 
MetadataSerializer<ClusterMetadata>
     {
         @Override
@@ -319,6 +377,7 @@ public class ClusterMetadata
             out.writeBoolean(metadata.lastInPeriod);
             out.writeUTF(metadata.partitioner.getClass().getCanonicalName());
             DistributedSchema.serializer.serialize(metadata.schema, out, 
version);
+            Directory.serializer.serialize(metadata.directory, out, version);
             out.writeInt(metadata.extensions.size());
             for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : 
metadata.extensions.entrySet())
             {
@@ -341,6 +400,7 @@ public class ClusterMetadata
             boolean lastInPeriod = in.readBoolean();
             IPartitioner partitioner = 
FBUtilities.newPartitioner(in.readUTF());
             DistributedSchema schema = 
DistributedSchema.serializer.deserialize(in, version);
+            Directory dir = Directory.serializer.deserialize(in, version);
             int items = in.readInt();
             Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new 
HashMap<>(items);
             for (int i = 0; i < items; i++)
@@ -359,6 +419,7 @@ public class ClusterMetadata
                                        lastInPeriod,
                                        partitioner,
                                        schema,
+                                       dir,
                                        members,
                                        extensions);
         }
@@ -375,7 +436,8 @@ public class ClusterMetadata
                     VIntCoding.computeUnsignedVIntSize(metadata.period) +
                     TypeSizes.BOOL_SIZE +
                     sizeof(metadata.partitioner.getClass().getCanonicalName()) 
+
-                    
DistributedSchema.serializer.serializedSize(metadata.schema, version);
+                    
DistributedSchema.serializer.serializedSize(metadata.schema, version) +
+                    Directory.serializer.serializedSize(metadata.directory, 
version);
 
             size += TypeSizes.INT_SIZE;
             for (InetAddressAndPort member : metadata.cmsMembers)
diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java 
b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
index 6db00c2964..b545e03d4c 100644
--- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java
+++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
@@ -27,6 +27,7 @@ public class MetadataKeys
     public static final String CORE_NS = 
MetadataKeys.class.getPackage().getName().toLowerCase(Locale.ROOT);
 
     public static final MetadataKey SCHEMA                  = make(CORE_NS, 
"schema", "dist_schema");
+    public static final MetadataKey NODE_DIRECTORY          = make(CORE_NS, 
"membership", "node_directory");
 
     public static final ImmutableSet<MetadataKey> CORE_METADATA = 
ImmutableSet.of(SCHEMA);
 
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index 1a9db06611..8f7afe0ebd 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.apache.cassandra.tcm.transformations.CustomTransformation;
 import org.apache.cassandra.tcm.transformations.ForceSnapshot;
+import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
 import org.apache.cassandra.tcm.transformations.cms.Initialize;
 import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
@@ -139,6 +140,7 @@ public interface Transformation
         FORCE_SNAPSHOT(() -> ForceSnapshot.serializer),
         SEAL_PERIOD(() -> SealPeriod.serializer),
         SCHEMA_CHANGE(() -> AlterSchema.serializer),
+        REGISTER(() -> Register.serializer),
         CUSTOM(() -> CustomTransformation.serializer);
 
         private final Supplier<AsymmetricMetadataSerializer<Transformation, ? 
extends Transformation>> serializer;
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 9ff241f72b..ad83e404ae 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Period;
+import org.apache.cassandra.tcm.membership.Directory;
 
 public class GossipHelper
 {
@@ -46,6 +47,7 @@ public class GossipHelper
                                    true,
                                    DatabaseDescriptor.getPartitioner(),
                                    
DistributedSchema.fromSystemTables(SchemaKeyspace.fetchNonSystemKeyspaces()),
+                                   Directory.EMPTY,
                                    Collections.emptySet(),
                                    Collections.emptyMap());
     }
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
new file mode 100644
index 0000000000..d04c52dfc1
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+import org.apache.cassandra.utils.btree.BTreeBiMap;
+import org.apache.cassandra.utils.btree.BTreeMap;
+import org.apache.cassandra.utils.btree.BTreeMultimap;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+
+public class Directory implements MetadataValue<Directory>
+{
+    public static final Serializer serializer = new Serializer();
+
+    public static Directory EMPTY = new Directory();
+
+    private final int nextId;
+    private final Epoch lastModified;
+    private final BTreeBiMap<NodeId, InetAddressAndPort> peers;
+    private final BTreeMap<NodeId, Location> locations;
+    public final BTreeMap<NodeId, NodeState> states;
+    public final BTreeMap<NodeId, NodeVersion> versions;
+    public final BTreeMap<NodeId, NodeAddresses> addresses;
+    private final BTreeMap<NodeId, UUID> hostIds;
+    private final BTreeMultimap<String, InetAddressAndPort> endpointsByDC;
+    private final BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC;
+
+    public Directory()
+    {
+        this(1,
+             Epoch.EMPTY,
+             BTreeBiMap.empty(),
+             BTreeMap.empty(),
+             BTreeMap.empty(),
+             BTreeMap.empty(),
+             BTreeMap.empty(),
+             BTreeMap.empty(),
+             BTreeMultimap.empty(),
+             BTreeMap.empty());
+    }
+
+    private Directory(int nextId,
+                      Epoch lastModified,
+                      BTreeBiMap<NodeId, InetAddressAndPort> peers,
+                      BTreeMap<NodeId, Location> locations,
+                      BTreeMap<NodeId, NodeState> states,
+                      BTreeMap<NodeId, NodeVersion> versions,
+                      BTreeMap<NodeId, UUID> hostIds,
+                      BTreeMap<NodeId, NodeAddresses> addresses,
+                      BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
+                      BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC)
+    {
+        this.nextId = nextId;
+        this.lastModified = lastModified;
+        this.peers = peers;
+        this.locations = locations;
+        this.states = states;
+        this.versions = versions;
+        this.hostIds = hostIds;
+        this.addresses = addresses;
+        this.endpointsByDC = endpointsByDC;
+        this.racksByDC = racksByDC;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Directory{" +
+               "nextId=" + nextId +
+               ", lastModified=" + lastModified +
+               ", peers=" + peers +
+               ", locations=" + locations +
+               ", states=" + states +
+               ", versions=" + versions +
+               ", addresses=" + addresses +
+               ", hostIds=" + hostIds +
+               ", endpointsByDC=" + endpointsByDC +
+               ", racksByDC=" + racksByDC +
+               '}';
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+
+    @Override
+    public Directory withLastModified(Epoch epoch)
+    {
+        return new Directory(nextId, epoch, peers, locations, states, 
versions, hostIds, addresses, endpointsByDC, racksByDC);
+    }
+
+    public Directory withNonUpgradedNode(NodeAddresses addresses,
+                                         Location location,
+                                         NodeVersion version,
+                                         NodeState state,
+                                         UUID hostId)
+    {
+        NodeId id = new NodeId(new UUID(0L, nextId));
+        return with(addresses, id, hostId, location, 
version).withNodeState(id, state).withRackAndDC(id);
+    }
+
+    @VisibleForTesting
+    public Directory with(NodeAddresses addresses, Location location)
+    {
+        return with(addresses, location, NodeVersion.CURRENT);
+    }
+
+    public Directory with(NodeAddresses addresses, Location location, 
NodeVersion nodeVersion)
+    {
+        // this is obviously not the right way to do this
+        NodeId id = new NodeId(new UUID(0L, nextId));
+        return with(addresses, id, id.uuid, location, nodeVersion);
+    }
+
+    private Directory with(NodeAddresses nodeAddresses, NodeId id, UUID 
hostId, Location location, NodeVersion nodeVersion)
+    {
+        if (peers.containsKey(id))
+            return this;
+        if (peers.containsValue(nodeAddresses.broadcastAddress))
+            return this;
+        if (locations.containsKey(id))
+            return this;
+
+        return new Directory(nextId + 1,
+                             lastModified,
+                             peers.without(id).with(id, 
nodeAddresses.broadcastAddress),
+                             locations.withForce(id, location),
+                             states.withForce(id, NodeState.REGISTERED),
+                             versions.withForce(id, nodeVersion),
+                             hostIds.withForce(id, hostId),
+                             addresses.withForce(id, nodeAddresses),
+                             endpointsByDC,
+                             racksByDC);
+    }
+
+    public Directory withNodeState(NodeId id, NodeState state)
+    {
+        return new Directory(nextId, lastModified, peers, locations, 
states.withForce(id, state), versions, hostIds, addresses, endpointsByDC, 
racksByDC);
+    }
+
+    public Directory withNodeVersion(NodeId id, NodeVersion version)
+    {
+        if (Objects.equals(versions.get(id), version))
+            return this;
+        return new Directory(nextId, lastModified, peers, locations, states, 
versions.withForce(id, version), hostIds, addresses, endpointsByDC, racksByDC);
+    }
+
+    public Directory withNodeAddresses(NodeId id, NodeAddresses nodeAddresses)
+    {
+        if (Objects.equals(addresses.get(id), nodeAddresses))
+            return this;
+        return new Directory(nextId, lastModified, peers, locations, states, 
versions, hostIds, addresses.withForce(id, nodeAddresses), endpointsByDC, 
racksByDC);
+    }
+
+    public Directory withRackAndDC(NodeId id)
+    {
+        InetAddressAndPort endpoint = peers.get(id);
+        Location location = locations.get(id);
+
+        BTreeMultimap<String, InetAddressAndPort> rackEP = 
(BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter);
+        if (rackEP == null)
+            rackEP = BTreeMultimap.empty();
+        rackEP = rackEP.with(location.rack, endpoint);
+
+        return new Directory(nextId, lastModified, peers, locations, states, 
versions, hostIds, addresses,
+                             endpointsByDC.with(location.datacenter, endpoint),
+                             racksByDC.withForce(location.datacenter, rackEP));
+    }
+
+    public Directory withoutRackAndDC(NodeId id)
+    {
+        InetAddressAndPort endpoint = peers.get(id);
+        Location location = locations.get(id);
+        BTreeMultimap<String, InetAddressAndPort> rackEP = 
(BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter);
+        rackEP = rackEP.without(location.rack, endpoint);
+        return new Directory(nextId, lastModified, peers, locations, states, 
versions, hostIds, addresses,
+                             endpointsByDC.without(location.datacenter, 
endpoint),
+                             racksByDC.withForce(location.datacenter, rackEP));
+    }
+
+    public Directory without(NodeId id)
+    {
+        InetAddressAndPort endpoint = peers.get(id);
+        Location location = locations.get(id);
+        BTreeMultimap<String, InetAddressAndPort> rackEP = 
(BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter);
+        rackEP = rackEP.without(location.rack, endpoint);
+
+        return new Directory(nextId,
+                             lastModified,
+                             peers.without(id),
+                             locations.without(id),
+                             states.without(id),
+                             versions.without(id),
+                             hostIds.without(id),
+                             addresses.without(id),
+                             endpointsByDC.without(location.datacenter, 
endpoint),
+                             racksByDC.withForce(location.datacenter, rackEP));
+    }
+
+    public NodeId peerId(InetAddressAndPort endpoint)
+    {
+        return peers.inverse().get(endpoint);
+    }
+
+    public boolean isRegistered(InetAddressAndPort endpoint)
+    {
+        return peers.inverse().containsKey(endpoint);
+    }
+
+    public InetAddressAndPort endpoint(NodeId id)
+    {
+        return peers.get(id);
+    }
+
+    public boolean isEmpty()
+    {
+        return peers.isEmpty();
+    }
+
+    public ImmutableList<InetAddressAndPort> allAddresses()
+    {
+        return ImmutableList.copyOf(peers.values());
+    }
+
+    public ImmutableSet<NodeId> peerIds()
+    {
+        return ImmutableSet.copyOf(peers.keySet());
+    }
+
+    public NodeAddresses getNodeAddresses(NodeId id)
+    {
+        return addresses.get(id);
+    }
+
+    private Node getNode(NodeId id)
+    {
+        return new Node(id, addresses.get(id), locations.get(id), 
states.get(id), versions.get(id), hostIds.get(id));
+    }
+
+    public Location location(NodeId id)
+    {
+        return locations.get(id);
+    }
+
+    public Set<InetAddressAndPort> datacenterEndpoints(String datacenter)
+    {
+        return (Set<InetAddressAndPort>) endpointsByDC.get(datacenter);
+    }
+
+    public Multimap<String, InetAddressAndPort> datacenterRacks(String 
datacenter)
+    {
+        return racksByDC.get(datacenter);
+    }
+
+    public NodeState peerState(NodeId peer)
+    {
+        return states.get(peer);
+    }
+
+    public NodeVersion version(NodeId peer)
+    {
+        return versions.get(peer);
+    }
+
+    public UUID hostId(NodeId peer)
+    {
+        return hostIds.getOrDefault(peer, peer.uuid);
+    }
+
+    public Map<String, Multimap<String, InetAddressAndPort>> 
allDatacenterRacks()
+    {
+        return racksByDC;
+    }
+
+    public Set<String> knownDatacenters()
+    {
+        return locations.values().stream().map(l -> 
l.datacenter).collect(Collectors.toSet());
+    }
+
+    public Multimap<String, InetAddressAndPort> allDatacenterEndpoints()
+    {
+        return endpointsByDC;
+    }
+
+    public NodeState peerState(InetAddressAndPort peer)
+    {
+        return states.get(peers.inverse().get(peer));
+    }
+
+    public String toDebugString()
+    {
+        return peers.keySet()
+                    .stream()
+                    .sorted()
+                    .map(this::getNode)
+                    .map(Node::toString)
+                    .collect(Collectors.joining("\n"));
+    }
+
+    private static class Node
+    {
+        public static final Serializer serializer = new Serializer();
+
+        public final NodeId id;
+        public final NodeAddresses addresses;
+        public final Location location;
+        public final NodeState state;
+        public final NodeVersion version;
+        public final UUID hostId;
+
+        public Node(NodeId id, NodeAddresses addresses, Location location, 
NodeState state, NodeVersion version, UUID hostId)
+        {
+            this.id = Preconditions.checkNotNull(id, "Node ID must not be 
null");
+            this.addresses = Preconditions.checkNotNull(addresses, "Node 
addresses must not be null");
+            this.location = Preconditions.checkNotNull(location, "Node 
location must not be null");
+            this.state = Preconditions.checkNotNull(state, "Node state must 
not be null");
+            this.version = Preconditions.checkNotNull(version, "Node version 
must not be null");
+            this.hostId = hostId;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Node node = (Node) o;
+            return id.equals(node.id)
+                   && addresses.equals(node.addresses)
+                   && location.equals(node.location)
+                   && state == node.state
+                   && version.equals(node.version);
+        }
+
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(id, addresses, location, state, version);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Node{" +
+                   "id=" + id +
+                   ", addresses=" + addresses +
+                   ", location=" + location +
+                   ", state=" + state +
+                   ", version=" + version +
+                   '}';
+        }
+
+        public static class Serializer implements MetadataSerializer<Node>
+        {
+            public void serialize(Node node, DataOutputPlus out, Version 
version) throws IOException
+            {
+                NodeId.serializer.serialize(node.id, out, version);
+                NodeAddresses.serializer.serialize(node.addresses, out, 
version);
+                out.writeUTF(node.location.datacenter);
+                out.writeUTF(node.location.rack);
+                out.writeInt(node.state.ordinal());
+                NodeVersion.serializer.serialize(node.version, out, version);
+                if (node.hostId == null)
+                    out.writeBoolean(false);
+                else
+                {
+                    out.writeBoolean(true);
+                    UUIDSerializer.serializer.serialize(node.hostId, out, 
MessagingService.VERSION_50);
+                }
+
+            }
+
+            public Node deserialize(DataInputPlus in, Version version) throws 
IOException
+            {
+                NodeId id = NodeId.serializer.deserialize(in, version);
+                NodeAddresses addresses = 
NodeAddresses.serializer.deserialize(in, version);
+                Location location = new Location(in.readUTF(), in.readUTF());
+                NodeState state = NodeState.values()[in.readInt()];
+                NodeVersion nodeVersion = 
NodeVersion.serializer.deserialize(in, version);
+                boolean hasHostId = in.readBoolean();
+                UUID hostId = hasHostId ? 
UUIDSerializer.serializer.deserialize(in, MessagingService.VERSION_50) : null;
+                return new Node(id, addresses, location, state, nodeVersion, 
hostId);
+            }
+
+            public long serializedSize(Node node, Version version)
+            {
+                long size = 0;
+                size += NodeId.serializer.serializedSize(node.id, version);
+                size += 
NodeAddresses.serializer.serializedSize(node.addresses, version);
+                size += sizeof(node.location.datacenter);
+                size += sizeof(node.location.rack);
+                size += TypeSizes.INT_SIZE;
+                size += NodeVersion.serializer.serializedSize(node.version, 
version);
+                size += TypeSizes.BOOL_SIZE;
+                if (node.hostId != null)
+                    size += 
UUIDSerializer.serializer.serializedSize(node.hostId, 
MessagingService.VERSION_50);
+                return size;
+            }
+        }
+    }
+
+    public static class Serializer implements MetadataSerializer<Directory>
+    {
+        public void serialize(Directory t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            out.writeInt(t.states.size());
+            for (NodeId nodeId : t.states.keySet())
+                Node.serializer.serialize(t.getNode(nodeId), out, version);
+
+            Set<String> dcs = t.racksByDC.keySet();
+            out.writeInt(dcs.size());
+            for (String dc : dcs)
+            {
+                out.writeUTF(dc);
+                Map<String, Collection<InetAddressAndPort>> racks = 
t.racksByDC.get(dc).asMap();
+                out.writeInt(racks.size());
+                for (String rack : racks.keySet())
+                {
+                    out.writeUTF(rack);
+                    Collection<InetAddressAndPort> endpoints = racks.get(rack);
+                    out.writeInt(endpoints.size());
+                    for (InetAddressAndPort endpoint : endpoints)
+                    {
+                        
InetAddressAndPort.MetadataSerializer.serializer.serialize(endpoint, out, 
version);
+                    }
+                }
+            }
+            Epoch.serializer.serialize(t.lastModified, out, version);
+        }
+
+        public Directory deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            int count = in.readInt();
+            Directory newDir = new Directory();
+            for (int i = 0; i < count; i++)
+            {
+                Node n = Node.serializer.deserialize(in, version);
+                // todo: bulk operations
+                newDir = newDir.with(n.addresses, n.id, n.hostId, n.location, 
n.version)
+                               .withNodeState(n.id, n.state);
+            }
+
+            int dcCount = in.readInt();
+            BTreeMultimap<String, InetAddressAndPort> dcEndpoints = 
BTreeMultimap.empty();
+            BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC = 
BTreeMap.empty();
+            for (int i=0; i<dcCount; i++)
+            {
+                String dc = in.readUTF();
+                int rackCount = in.readInt();
+                BTreeMultimap<String, InetAddressAndPort> rackEndpoints = 
BTreeMultimap.empty();
+                for (int j=0; j<rackCount; j++)
+                {
+                    String rack = in.readUTF();
+                    int epCount = in.readInt();
+                    for (int k=0; k<epCount; k++)
+                    {
+                        InetAddressAndPort endpoint = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+                        rackEndpoints = rackEndpoints.with(rack, endpoint);
+                        dcEndpoints = dcEndpoints.with(dc, endpoint);
+                    }
+                    racksByDC = racksByDC.withForce(dc, rackEndpoints);
+                }
+            }
+
+            Epoch lastModified = Epoch.serializer.deserialize(in, version);
+            return new Directory(newDir.nextId,
+                                 lastModified,
+                                 newDir.peers,
+                                 newDir.locations,
+                                 newDir.states,
+                                 newDir.versions,
+                                 newDir.hostIds,
+                                 newDir.addresses,
+                                 dcEndpoints,
+                                 racksByDC);
+        }
+
+        public long serializedSize(Directory t, Version version)
+        {
+            int size = sizeof(t.states.size());
+            for (NodeId nodeId : t.states.keySet())
+                size += Node.serializer.serializedSize(t.getNode(nodeId), 
version);
+
+            size += sizeof(t.racksByDC.size());
+            for (Map.Entry<String, Multimap<String, InetAddressAndPort>> entry 
: t.racksByDC.entrySet())
+            {
+                size += sizeof(entry.getKey());
+                Map<String, Collection<InetAddressAndPort>> racks = 
entry.getValue().asMap();
+                size += sizeof(racks.size());
+                for (Map.Entry<String, Collection<InetAddressAndPort>> e : 
racks.entrySet())
+                {
+                    size += sizeof(e.getKey());
+                    Collection<InetAddressAndPort> endpoints = e.getValue();
+                    size += sizeof(endpoints.size());
+                    for (InetAddressAndPort endpoint : endpoints)
+                    {
+                        size += 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(endpoint, 
version);
+                    }
+                }
+            }
+            size += Epoch.serializer.serializedSize(t.lastModified, version);
+            return size;
+        }
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof Directory)) return false;
+        Directory directory = (Directory) o;
+
+        return Objects.equals(lastModified, directory.lastModified) &&
+               isEquivalent(directory);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(nextId, lastModified, peers, locations, states, 
endpointsByDC, racksByDC, versions, addresses);
+    }
+
+    /**
+     * returns true if this directory is functionally equivalent to the given 
one
+     *
+     * does not check equality of lastModified
+     */
+    @VisibleForTesting
+    public boolean isEquivalent(Directory directory)
+    {
+        return nextId == directory.nextId &&
+               Objects.equals(peers, directory.peers) &&
+               Objects.equals(locations, directory.locations) &&
+               Objects.equals(states, directory.states) &&
+               Objects.equals(endpointsByDC, directory.endpointsByDC) &&
+               Objects.equals(racksByDC, directory.racksByDC) &&
+               Objects.equals(versions, directory.versions) &&
+               Objects.equals(addresses, directory.addresses);
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/membership/Location.java 
b/src/java/org/apache/cassandra/tcm/membership/Location.java
new file mode 100644
index 0000000000..7e38db79cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/membership/Location.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class Location
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final String datacenter;
+    public final String rack;
+
+    public Location(String datacenter, String rack)
+    {
+        this.datacenter = datacenter;
+        this.rack = rack;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Location location = (Location) o;
+        return Objects.equals(datacenter, location.datacenter) && 
Objects.equals(rack, location.rack);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(datacenter, rack);
+    }
+
+    @Override
+    public String toString()
+    {
+        return datacenter + '/' + rack;
+    }
+
+    public static class Serializer implements MetadataSerializer<Location>
+    {
+        public void serialize(Location t, DataOutputPlus out, Version version) 
throws IOException
+        {
+            out.writeUTF(t.datacenter);
+            out.writeUTF(t.rack);
+        }
+
+        public Location deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            return new Location(in.readUTF(), in.readUTF());
+        }
+
+        public long serializedSize(Location t, Version version)
+        {
+            return TypeSizes.sizeof(t.datacenter) +
+                   TypeSizes.sizeof(t.rack);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java
new file mode 100644
index 0000000000..dcd7ee99f2
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class NodeAddresses
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final InetAddressAndPort broadcastAddress;
+    public final InetAddressAndPort localAddress;
+    public final InetAddressAndPort nativeAddress;
+
+    /**
+     *
+     * @param broadcastAddress this comes from config if broadcast_address is 
set or it falls through to getLocalAddress
+     *                         which either grabs the local host address or 
listen_address from config if set
+     *                         todo; config broadcast_address can be changed 
by snitch (EC2MultiRegionSnitch) during runtime, handle that
+     * @param localAddress this is the local host if listen_address is not set 
in config
+     * @param nativeAddress address for clients to communicate with this node
+     */
+    public NodeAddresses(InetAddressAndPort broadcastAddress, 
InetAddressAndPort localAddress, InetAddressAndPort nativeAddress)
+    {
+        this.broadcastAddress = broadcastAddress;
+        this.localAddress = localAddress;
+        this.nativeAddress = nativeAddress;
+    }
+
+    public NodeAddresses(InetAddressAndPort address)
+    {
+        this(address, address, address);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "NodeAddresses{" +
+               "broadcastAddress=" + broadcastAddress +
+               ", localAddress=" + localAddress +
+               ", nativeAddress=" + nativeAddress +
+               '}';
+    }
+
+    public boolean conflictsWith(NodeAddresses other)
+    {
+        return broadcastAddress.equals(other.broadcastAddress) ||
+               localAddress.equals(other.localAddress) ||
+               nativeAddress.equals(other.nativeAddress);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof NodeAddresses)) return false;
+        NodeAddresses that = (NodeAddresses) o;
+        return Objects.equals(broadcastAddress, that.broadcastAddress) && 
Objects.equals(localAddress, that.localAddress) && 
Objects.equals(nativeAddress, that.nativeAddress);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(broadcastAddress, localAddress, nativeAddress);
+    }
+
+    public static NodeAddresses current()
+    {
+        return new NodeAddresses(FBUtilities.getBroadcastAddressAndPort(),
+                                 FBUtilities.getLocalAddressAndPort(),
+                                 
FBUtilities.getBroadcastNativeAddressAndPort());
+    }
+
+    public static class Serializer implements MetadataSerializer<NodeAddresses>
+    {
+        @Override
+        public void serialize(NodeAddresses t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            
InetAddressAndPort.MetadataSerializer.serializer.serialize(t.broadcastAddress, 
out, version);
+            
InetAddressAndPort.MetadataSerializer.serializer.serialize(t.localAddress, out, 
version);
+            
InetAddressAndPort.MetadataSerializer.serializer.serialize(t.nativeAddress, 
out, version);
+        }
+
+        @Override
+        public NodeAddresses deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            InetAddressAndPort broadcastAddress = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+            InetAddressAndPort localAddress = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+            InetAddressAndPort rpcAddress = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
+            return new NodeAddresses(broadcastAddress, localAddress, 
rpcAddress);
+        }
+
+        @Override
+        public long serializedSize(NodeAddresses t, Version version)
+        {
+            return 
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.broadcastAddress,
 version) +
+                   
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.localAddress, 
version) +
+                   
InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.nativeAddress,
 version);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
new file mode 100644
index 0000000000..862dacbc03
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class NodeId implements Comparable<NodeId>
+{
+    public static final Serializer serializer = new Serializer();
+
+    public final UUID uuid;
+
+    public NodeId(UUID id)
+    {
+        this.uuid = id;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        NodeId nodeId = (NodeId) o;
+        return Objects.equals(uuid, nodeId.uuid);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(uuid);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "NodeId{" +
+               "id=" + uuid +
+               '}';
+    }
+
+    public int compareTo(NodeId o)
+    {
+        return uuid.compareTo(o.uuid);
+    }
+
+    public static class Serializer implements MetadataSerializer<NodeId>
+    {
+        public void serialize(NodeId n, DataOutputPlus out, Version version) 
throws IOException
+        {
+            out.writeLong(n.uuid.getMostSignificantBits());
+            out.writeLong(n.uuid.getLeastSignificantBits());
+        }
+
+        public NodeId deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            return new NodeId(new UUID(in.readLong(), in.readLong()));
+        }
+
+        public long serializedSize(NodeId t, Version version)
+        {
+            return TypeSizes.LONG_SIZE + TypeSizes.LONG_SIZE;
+        }
+    }
+
+}
diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeState.java
similarity index 50%
copy from src/java/org/apache/cassandra/tcm/MetadataKeys.java
copy to src/java/org/apache/cassandra/tcm/membership/NodeState.java
index 6db00c2964..78074145e8 100644
--- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeState.java
@@ -16,30 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.tcm;
-
-import java.util.Locale;
-
-import com.google.common.collect.ImmutableSet;
-
-public class MetadataKeys
+package org.apache.cassandra.tcm.membership;
+public enum NodeState
 {
-    public static final String CORE_NS = 
MetadataKeys.class.getPackage().getName().toLowerCase(Locale.ROOT);
-
-    public static final MetadataKey SCHEMA                  = make(CORE_NS, 
"schema", "dist_schema");
-
-    public static final ImmutableSet<MetadataKey> CORE_METADATA = 
ImmutableSet.of(SCHEMA);
-
-    public static MetadataKey make(String...parts)
-    {
-        assert parts != null && parts.length >= 1;
-        StringBuilder b = new StringBuilder(parts[0]);
-        for (int i = 1; i < parts.length; i++)
-        {
-            b.append('.');
-            b.append(parts[i]);
-        }
-        return new MetadataKey(b.toString());
-    }
-
+    REGISTERED,
+    BOOTSTRAPPING,
+    JOINED,
+    LEAVING,
+    LEFT,
+    MOVING;
+
+    // TODO: probably we can make these states even more nuanced, and track 
which step each node is on to have a simpler representation of transition states
 }
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
new file mode 100644
index 0000000000..4234d66bce
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt;
+
+public class NodeVersion implements Comparable<NodeVersion>
+{
+    public static final Serializer serializer = new Serializer();
+    public static final NodeVersion CURRENT = new NodeVersion(new 
CassandraVersion(FBUtilities.getReleaseVersionString()), Version.V0);
+
+    public final CassandraVersion cassandraVersion;
+    public final Version serializationVersion;
+
+    public NodeVersion(CassandraVersion cassandraVersion, Version 
serializationVersion)
+    {
+        this.cassandraVersion = cassandraVersion;
+        this.serializationVersion = serializationVersion;
+    }
+
+    public boolean isUpgraded()
+    {
+        return serializationVersion.asInt() >= Version.V0.asInt();
+    }
+
+    @Override
+    public String toString()
+    {
+        return "NodeVersion{" +
+               "cassandraVersion=" + cassandraVersion +
+               ", serializationVersion=" + serializationVersion +
+               '}';
+    }
+
+    @Override
+    public int compareTo(NodeVersion o)
+    {
+        // only comparing cassandraVersion here - if we bump 
serializationVersion we need to release a new cassandra version
+        return cassandraVersion.compareTo(o.cassandraVersion);
+    }
+
+    public static NodeVersion fromCassandraVersion(CassandraVersion cv)
+    {
+        if (cv == null)
+            return CURRENT;
+        Version version = Version.OLD;
+        if (cv.compareTo(CassandraVersion.CASSANDRA_5_0) >= 0)
+            version = Version.V0;
+        return new NodeVersion(cv, version);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof NodeVersion)) return false;
+        NodeVersion that = (NodeVersion) o;
+        return Objects.equals(cassandraVersion, that.cassandraVersion) && 
serializationVersion == that.serializationVersion;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(cassandraVersion, serializationVersion);
+    }
+
+    public static class Serializer implements MetadataSerializer<NodeVersion>
+    {
+        @Override
+        public void serialize(NodeVersion t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            out.writeUTF(t.cassandraVersion.toString());
+            out.writeUnsignedVInt32(t.serializationVersion.asInt());
+        }
+
+        @Override
+        public NodeVersion deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            CassandraVersion cassandraVersion = new 
CassandraVersion(in.readUTF());
+            Version serializationVersion = 
Version.fromInt(in.readUnsignedVInt32());
+            return new NodeVersion(cassandraVersion, serializationVersion);
+        }
+
+        @Override
+        public long serializedSize(NodeVersion t, Version version)
+        {
+            return sizeof(t.cassandraVersion.toString()) +
+                   sizeofUnsignedVInt(t.serializationVersion.asInt());
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java 
b/src/java/org/apache/cassandra/tcm/transformations/Register.java
new file mode 100644
index 0000000000..fe1a5154c1
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class Register implements Transformation
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(Register.class);
+    public static final Serializer serializer = new Serializer();
+
+    private final NodeAddresses addresses;
+    private final Location location;
+    private final NodeVersion version;
+
+    public Register(NodeAddresses addresses, Location location, NodeVersion 
version)
+    {
+        this.location = location;
+        this.version = version;
+        this.addresses = addresses;
+    }
+
+    @Override
+    public Kind kind()
+    {
+        return Kind.REGISTER;
+    }
+
+    @Override
+    public Result execute(ClusterMetadata prev)
+    {
+        for (Map.Entry<NodeId, NodeAddresses> entry : 
prev.directory.addresses.entrySet())
+        {
+            NodeAddresses existingAddresses = entry.getValue();
+            if (addresses.conflictsWith(existingAddresses))
+                return new Rejected(String.format("New addresses %s conflicts 
with existing node %s with addresses %s", addresses, entry.getKey(), 
existingAddresses));
+        }
+
+        ClusterMetadata.Transformer next = prev.transformer()
+                                               .register(addresses, location, 
version);
+        return success(next);
+    }
+
+    public static NodeId maybeRegister()
+    {
+        return register(false);
+    }
+
+    @VisibleForTesting
+    public static NodeId forceRegister()
+    {
+        return register(true);
+    }
+
+    @VisibleForTesting
+    public static NodeId register(NodeAddresses nodeAddresses)
+    {
+        return register(nodeAddresses, NodeVersion.CURRENT);
+    }
+
+    @VisibleForTesting
+    public static NodeId register(NodeAddresses nodeAddresses, NodeVersion 
nodeVersion)
+    {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        Location location = new Location(snitch.getLocalDatacenter(), 
snitch.getLocalRack());
+
+        NodeId nodeId = 
ClusterMetadata.current().directory.peerId(nodeAddresses.broadcastAddress);
+        if (nodeId == null)
+        {
+            nodeId = ClusterMetadataService.instance()
+                                           .commit(new Register(nodeAddresses, 
location, nodeVersion),
+                                                   (metadata) -> 
!metadata.directory.isRegistered(nodeAddresses.broadcastAddress),
+                                                   (metadata) -> 
metadata.directory.peerId(nodeAddresses.broadcastAddress),
+                                                   (metadata, reason) -> {
+                                                       throw new 
IllegalStateException("Can't register node: " + reason);
+                                                   });
+        }
+
+        logger.info("Registering with endpoint {}", 
nodeAddresses.broadcastAddress);
+        return nodeId;
+    }
+
+    private static NodeId register(boolean force)
+    {
+        // Try to recover node ID from the system keyspace
+        UUID localHostId = SystemKeyspace.getLocalHostId();
+        if (force || localHostId == null)
+        {
+            NodeId nodeId = register(NodeAddresses.current());
+            localHostId = nodeId.uuid;
+            SystemKeyspace.setLocalHostId(localHostId);
+            logger.info("New node ID obtained {}, (Note: This should happen 
exactly once per node)", nodeId.uuid);
+            return nodeId;
+        }
+        else
+        {
+            Directory dir = ClusterMetadata.current().directory;
+            NodeId nodeId = 
dir.peerId(FBUtilities.getBroadcastAddressAndPort());
+            NodeVersion dirVersion = dir.version(nodeId);
+
+            // If this is a node in the process of upgrading, update the host 
id in the system.local table
+            // TODO: when constructing the initial cluster metadata for 
upgrade, we include a mapping from
+            //      NodeId to the old HostId. We will need to use this lookup 
to map between the two for
+            //      hint delivery immediately following an upgrade.
+            if (dirVersion == null || !dirVersion.isUpgraded())
+            {
+                if (dir.hostId(nodeId).equals(localHostId))
+                {
+                    SystemKeyspace.setLocalHostId(nodeId.uuid);
+                    logger.info("Updated local HostId from pre-upgrade version 
{} to the one which was pre-registered " +
+                                "during initial cluster metadata conversion 
{}", localHostId, nodeId.uuid);
+                }
+                else
+                {
+                    throw new RuntimeException("HostId read from local system 
table does not match the one recorded " +
+                                               "for this endpoint during 
initial cluster metadata conversion. " +
+                                               String.format("Endpoint: %s, 
NodeId: %s, Recorded: %s, Local: %s",
+                                                             
FBUtilities.getBroadcastAddressAndPort(),
+                                                             nodeId,
+                                                             
dir.hostId(nodeId),
+                                                             localHostId));
+                }
+            }
+            else
+            {
+                logger.info("Local id was already registered, retaining: {}", 
localHostId);
+            }
+            return nodeId;
+        }
+    }
+
+    public String toString()
+    {
+        return "Register{" +
+               ", addresses=" + addresses +
+               ", location=" + location +
+               ", version=" + version +
+               '}';
+    }
+
+    static class Serializer implements 
AsymmetricMetadataSerializer<Transformation, Register>
+    {
+        public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            assert t instanceof Register;
+            Register register = (Register)t;
+            NodeAddresses.serializer.serialize(register.addresses, out, 
version);
+            Location.serializer.serialize(register.location, out, version);
+            NodeVersion.serializer.serialize(register.version, out, version);
+        }
+
+        public Register deserialize(DataInputPlus in, Version version) throws 
IOException
+        {
+            NodeAddresses addresses = NodeAddresses.serializer.deserialize(in, 
version);
+            Location location = Location.serializer.deserialize(in, version);
+            NodeVersion nodeVersion = NodeVersion.serializer.deserialize(in, 
version);
+            return new Register(addresses, location, nodeVersion);
+        }
+
+        public long serializedSize(Transformation t, Version version)
+        {
+            assert t instanceof Register;
+            Register register = (Register) t;
+            return NodeAddresses.serializer.serializedSize(register.addresses, 
version) +
+                   Location.serializer.serializedSize(register.location, 
version) +
+                   NodeVersion.serializer.serializedSize(register.version, 
version);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java 
b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index 766f0e8173..f4c3dd7388 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -50,6 +50,7 @@ public class CassandraVersion implements 
Comparable<CassandraVersion>
 
     private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP);
 
+    public static final CassandraVersion CASSANDRA_5_0 = new 
CassandraVersion("5.0").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_4_1 = new 
CassandraVersion("4.1").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_4_0 = new 
CassandraVersion("4.0").familyLowerBound.get();
     public static final CassandraVersion CASSANDRA_4_0_RC2 = new 
CassandraVersion(4, 0, 0, NO_HOTFIX, new String[] {"rc2"}, null);
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java 
b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java
new file mode 100644
index 0000000000..1e1984e635
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.btree;
+
+import java.util.Comparator;
+import java.util.Set;
+
+import com.google.common.collect.BiMap;
+
+import static java.util.Comparator.naturalOrder;
+
+public class BTreeBiMap<K, V> extends AbstractBTreeMap<K, V> implements 
BiMap<K, V>
+{
+    private final Object[] inverse;
+    private final KeyComparator<V, K> valueComparator;
+
+    protected static <K, V> BTreeBiMap<K, V> withComparators(Object[] tree, 
Object [] inverse, Comparator<K> comparator, Comparator<V> valueComparator)
+    {
+        return new BTreeBiMap<>(tree, inverse, new 
KeyComparator<>(comparator), new KeyComparator<>(valueComparator));
+    }
+
+    private BTreeBiMap(Object[] tree, Object [] inverse, KeyComparator<K, V> 
comparator, KeyComparator<V, K> valueComparator)
+    {
+        super(tree, comparator);
+        this.valueComparator = valueComparator;
+        this.inverse = inverse;
+    }
+
+    public static <K, V> BTreeBiMap<K, V> empty(Comparator<K> comparator, 
Comparator<V> valueComparator)
+    {
+        return withComparators(BTree.empty(), BTree.empty(), comparator, 
valueComparator);
+    }
+
+    public static <K extends Comparable<K>, V extends Comparable<V>> 
BTreeBiMap<K, V> empty()
+    {
+        return BTreeBiMap.<K, V>empty(naturalOrder(), naturalOrder());
+    }
+
+    @Override
+    public BiMap<V, K> inverse()
+    {
+        return new BTreeBiMap<>(inverse, tree, valueComparator, comparator);
+    }
+
+    @Override
+    public BTreeBiMap<K, V> with(K key, V value)
+    {
+        if (key == null || value == null)
+            throw new NullPointerException();
+        AbstractBTreeMap.Entry<K, V> entry = new AbstractBTreeMap.Entry<>(key, 
value);
+        AbstractBTreeMap.Entry<V, K> inverseEntry = new 
AbstractBTreeMap.Entry<>(value, key);
+        if (BTree.find(tree, comparator, entry) != null)
+            throw new IllegalArgumentException("Key already exists in map: " + 
key);
+        if (BTree.find(inverse, valueComparator, inverseEntry) != null)
+            throw new IllegalArgumentException("Value already exists in map: " 
+ value);
+
+        return new BTreeBiMap<>(BTree.update(tree, new Object[]{ entry }, 
comparator, UpdateFunction.noOp()),
+                                BTree.update(inverse, new Object[] { new 
AbstractBTreeMap.Entry<>(value, key) }, valueComparator, UpdateFunction.noOp()),
+                                comparator,
+                                valueComparator);
+    }
+
+    @Override
+    public BTreeBiMap<K, V> withForce(K key, V value)
+    {
+        // todo: optimise
+        return without(key).with(key, value);
+    }
+
+    public BTreeBiMap<K, V> without(K key)
+    {
+        AbstractBTreeMap.Entry<K, V> entry = new AbstractBTreeMap.Entry<>(key, 
null);
+        AbstractBTreeMap.Entry<K, V> existingEntry = BTree.find(tree, 
comparator, entry);
+        if (existingEntry == null)
+            return this;
+
+        Object[] newTree = BTreeRemoval.remove(tree, comparator, new 
AbstractBTreeMap.Entry<>(key, null));
+        Object[] newInverse = BTreeRemoval.remove(inverse, valueComparator, 
new AbstractBTreeMap.Entry<>(existingEntry.getValue(), null));
+        return new BTreeBiMap<>(newTree, newInverse, comparator, 
valueComparator);
+    }
+
+    public Set<V> values()
+    {
+        return inverse().keySet();
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java 
b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java
new file mode 100644
index 0000000000..aefa94a8dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.btree;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import static java.util.Comparator.naturalOrder;
+
+public class BTreeMultimap<K, V> implements Multimap<K, V>
+{
+    private final BTreeMap<K, Collection<V>> map;
+    private final Comparator<K> comparator;
+    private final Comparator<V> valueComparator;
+    private final int size;
+
+    private BTreeMultimap(BTreeMap<K, Collection<V>> map, Comparator<K> 
comparator, Comparator<V> valueComparator, int size)
+    {
+        this.map = map;
+        this.comparator = comparator;
+        this.valueComparator = valueComparator;
+        this.size = size;
+    }
+
+    public static <K extends Comparable<K>, V extends Comparable<V>> 
BTreeMultimap<K, V> empty()
+    {
+        return new BTreeMultimap<K, V>(BTreeMap.empty(), naturalOrder(), 
naturalOrder(), 0);
+    }
+
+    public BTreeMultimap<K, V> with(K key, V value)
+    {
+        if (map.containsKey(key))
+        {
+            BTreeSet<V> oldSet = (BTreeSet<V>) map.get(key);
+            BTreeSet<V> newSet = oldSet.with(value);
+            int newSize = size + newSet.size() - oldSet.size();
+            return new BTreeMultimap<>(map.without(key).with(key, newSet), 
comparator, valueComparator, newSize);
+        }
+        else
+        {
+            BTreeSet<V> newSet = BTreeSet.of(valueComparator, value);
+            return new BTreeMultimap<>(map.with(key, newSet), comparator, 
valueComparator, size + 1);
+        }
+    }
+
+    public BTreeMultimap<K, V> without(K key)
+    {
+        Collection<V> oldSet = map.get(key);
+        if (oldSet == null)
+            return this;
+        int newSize = size - oldSet.size();
+        return new BTreeMultimap<>(map.without(key), comparator, 
valueComparator, newSize);
+    }
+
+    public BTreeMultimap<K, V> without(K key, V value)
+    {
+        BTreeSet<V> values = (BTreeSet<V>) map.get(key);
+        if (values == null)
+            return this;
+        if (!values.contains(value))
+            return this;
+        BTreeSet<V> newValues = BTreeSet.wrap(BTreeRemoval.remove(values.tree, 
valueComparator, value), valueComparator);
+        BTreeMap<K, Collection<V>> newMap = map.without(key);
+        if (newValues.isEmpty())
+            return new BTreeMultimap<>(newMap, comparator, valueComparator, 
size - 1);
+
+        return new BTreeMultimap<>(newMap.with(key, newValues), comparator, 
valueComparator, size - 1);
+    }
+
+    @Override
+    public int size()
+    {
+        return size;
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(@Nullable Object o)
+    {
+        if (o == null)
+            return false;
+        return map.containsKey(o);
+    }
+
+    @Override
+    public boolean containsValue(@Nullable Object o)
+    {
+        if (o == null)
+            return false;
+        for (Map.Entry<K, Collection<V>> e : map.entrySet())
+            if (e.getValue().contains(o))
+                return true;
+        return false;
+    }
+
+    @Override
+    public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+    {
+        if (key == null || value == null)
+            throw new NullPointerException();
+        return map.containsKey(key) && map.get(key).contains(value);
+    }
+
+    @Override
+    public Collection<V> get(@Nullable K k)
+    {
+        if (k == null)
+            return null;
+        return map.get(k);
+    }
+
+    @Override
+    public Set<K> keySet()
+    {
+        return map.keySet();
+    }
+
+    @Override
+    public Multiset<K> keys()
+    {
+        ImmutableMultiset.Builder<K> keys = ImmutableMultiset.builder();
+        keys.addAll(map.keySet());
+        return keys.build();
+    }
+
+    @Override
+    public Collection<V> values()
+    {
+        ImmutableList.Builder<V> builder = ImmutableList.builder();
+        for (Map.Entry<K, Collection<V>> entry : map.entrySet())
+            builder.addAll(entry.getValue());
+        return builder.build();
+    }
+
+    @Override
+    public Collection<Map.Entry<K, V>> entries()
+    {
+        Set<Map.Entry<K, V>> entries = new HashSet<>();
+        for (Map.Entry<K, Collection<V>> entry : map.entrySet())
+            for (V v : entry.getValue())
+                entries.add(new AbstractBTreeMap.Entry<>(entry.getKey(), v));
+        return entries;
+    }
+
+    @Override
+    public Map<K, Collection<V>> asMap()
+    {
+        return map;
+    }
+
+    public boolean put(@Nullable K k, @Nullable V v) { throw new 
UnsupportedOperationException();}
+    public boolean remove(@Nullable Object o, @Nullable Object o1) {throw new 
UnsupportedOperationException();}
+    public boolean putAll(@Nullable K k, Iterable<? extends V> iterable) 
{throw new UnsupportedOperationException();}
+    public boolean putAll(Multimap<? extends K, ? extends V> multimap) {throw 
new UnsupportedOperationException();}
+    public Collection<V> replaceValues(@Nullable K k, Iterable<? extends V> 
iterable) {throw new UnsupportedOperationException();}
+    public Collection<V> removeAll(@Nullable Object o) {throw new 
UnsupportedOperationException();}
+    public void clear() { throw new UnsupportedOperationException(); }
+
+    @Override
+    public String toString()
+    {
+        return map.toString();
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (!(o instanceof BTreeMultimap)) return false;
+        BTreeMultimap<?, ?> that = (BTreeMultimap<?, ?>) o;
+        return size == that.size &&
+               Objects.equals(map, that.map) &&
+               Objects.equals(comparator, that.comparator) &&
+               Objects.equals(valueComparator, that.valueComparator);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(map, comparator, valueComparator, size);
+    }
+}
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java 
b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
index b20729d7da..5bb3ac0446 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
@@ -18,7 +18,18 @@
  */
 package org.apache.cassandra.utils.btree;
 
-import java.util.*;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedSet;
+import java.util.Spliterator;
+import java.util.Spliterators;
 
 import com.google.common.collect.Ordering;
 
@@ -27,7 +38,7 @@ import org.apache.cassandra.utils.btree.BTree.Dir;
 import static org.apache.cassandra.utils.btree.BTree.findIndex;
 
 
-public class BTreeSet<V> implements NavigableSet<V>, List<V>
+public class BTreeSet<V> extends AbstractSet<V> implements NavigableSet<V>, 
List<V>
 {
     protected final Comparator<? super V> comparator;
     protected final Object[] tree;
@@ -175,12 +186,16 @@ public class BTreeSet<V> implements NavigableSet<V>, 
List<V>
     @Override
     public V first()
     {
+        if (isEmpty())
+            throw new NoSuchElementException();
         return get(0);
     }
 
     @Override
     public V last()
     {
+        if (isEmpty())
+            throw new NoSuchElementException();
         return get(size() - 1);
     }
 
@@ -223,7 +238,6 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
                 return false;
         return true;
     }
-
     public int hashCode()
     {
         // we can't just delegate to Arrays.deepHashCode(),
@@ -234,6 +248,7 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V>
         return result;
     }
 
+
     @Override
     public boolean addAll(Collection<? extends V> c)
     {
@@ -590,24 +605,24 @@ public class BTreeSet<V> implements NavigableSet<V>, 
List<V>
 
         public Builder<V> add(V v)
         {
-            wrapped .add(v);
+            wrapped.add(v);
             return this;
         }
 
         public Builder<V> addAll(Collection<V> iter)
         {
-            wrapped .addAll(iter);
+            wrapped.addAll(iter);
             return this;
         }
 
         public boolean isEmpty()
         {
-            return wrapped .isEmpty();
+            return wrapped.isEmpty();
         }
 
         public BTreeSet<V> build()
         {
-            return new BTreeSet<>(wrapped .build(), wrapped .comparator);
+            return new BTreeSet<>(wrapped.build(), wrapped.comparator);
         }
     }
 
@@ -627,6 +642,22 @@ public class BTreeSet<V> implements NavigableSet<V>, 
List<V>
         return new BTreeSet<>(btree, comparator);
     }
 
+    public BTreeSet<V> with(Collection<V> updateWith)
+    {
+        Object[] with = 
BTreeSet.<V>builder(comparator).addAll(updateWith).build().tree;
+        return new BTreeSet<>(BTree.update(tree, with, comparator, 
UpdateFunction.<V>noOp()), comparator);
+    }
+
+    public BTreeSet<V> with(V updateWith)
+    {
+        return new BTreeSet<>(BTree.update(tree, new Object[] { updateWith }, 
comparator, UpdateFunction.<V>noOp()), comparator);
+    }
+
+    public BTreeSet<V> without(V element)
+    {
+        return new BTreeSet<>(BTreeRemoval.remove(tree, comparator, element), 
comparator);
+    }
+
     public static <V extends Comparable<V>> BTreeSet<V> of(Collection<V> 
sortedValues)
     {
         return new BTreeSet<>(BTree.build(sortedValues), 
Ordering.<V>natural());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to