This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 363b34c0fb5e4fd13b851ab10ed6059d3bb35964 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Wed Mar 1 10:53:51 2023 +0000 [CEP-21] Add rudimentary cluster membership to TCM Adds a new Directory component to ClusterMetadata to manage member identity, state location and addressing. This duplicates some of the functions of TokenMetadata, Topology et al but with updates performed consistently via the global log. Although it isn't actually used for anything yet it is a prerequisite for managing data ownership through TCM, which will eventually replace TokenMetadata completely. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../apache/cassandra/service/StorageService.java | 3 + .../org/apache/cassandra/tcm/ClusterMetadata.java | 68 ++- .../org/apache/cassandra/tcm/MetadataKeys.java | 1 + .../org/apache/cassandra/tcm/Transformation.java | 2 + .../cassandra/tcm/compatibility/GossipHelper.java | 2 + .../apache/cassandra/tcm/membership/Directory.java | 578 +++++++++++++++++++++ .../apache/cassandra/tcm/membership/Location.java | 83 +++ .../cassandra/tcm/membership/NodeAddresses.java | 125 +++++ .../apache/cassandra/tcm/membership/NodeId.java | 89 ++++ .../NodeState.java} | 35 +- .../cassandra/tcm/membership/NodeVersion.java | 118 +++++ .../cassandra/tcm/transformations/Register.java | 208 ++++++++ .../apache/cassandra/utils/CassandraVersion.java | 1 + .../apache/cassandra/utils/btree/BTreeBiMap.java | 102 ++++ .../cassandra/utils/btree/BTreeMultimap.java | 214 ++++++++ .../org/apache/cassandra/utils/btree/BTreeSet.java | 45 +- 16 files changed, 1639 insertions(+), 35 deletions(-) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ea1903bb0c..75557a4489 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -208,6 +208,8 @@ import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Startup; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.transport.ClientResourceLimits; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.Clock; @@ -989,6 +991,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE throw new RuntimeException(e); } + NodeId self = Register.maybeRegister(); completeInitialization(); } diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 4d9f5a28cf..90d5343d2c 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -38,6 +38,12 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.tcm.extensions.ExtensionKey; import org.apache.cassandra.tcm.extensions.ExtensionValue; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.cms.EntireRange; @@ -58,16 +64,28 @@ public class ClusterMetadata public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions; public final DistributedSchema schema; + public final Directory directory; public final EndpointsForRange cmsReplicas; public final ImmutableSet<InetAddressAndPort> cmsMembers; public ClusterMetadata(IPartitioner partitioner) + { + this(partitioner, Directory.EMPTY); + } + + private ClusterMetadata(IPartitioner partitioner, Directory directory) + { + this(partitioner, directory, DistributedSchema.first()); + } + + private ClusterMetadata(IPartitioner partitioner, Directory directory, DistributedSchema schema) { this(Epoch.EMPTY, Period.EMPTY, true, partitioner, - DistributedSchema.first(), + schema, + directory, ImmutableSet.of(), ImmutableMap.of()); } @@ -77,6 +95,7 @@ public class ClusterMetadata boolean lastInPeriod, IPartitioner partitioner, DistributedSchema schema, + Directory directory, Set<InetAddressAndPort> cmsMembers, Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) { @@ -85,6 +104,7 @@ public class ClusterMetadata this.lastInPeriod = lastInPeriod; this.partitioner = partitioner; this.schema = schema; + this.directory = directory; this.cmsMembers = ImmutableSet.copyOf(cmsMembers); this.extensions = ImmutableMap.copyOf(extensions); @@ -122,6 +142,7 @@ public class ClusterMetadata lastInPeriod, partitioner, schema, + directory, cmsMembers, extensions); } @@ -144,6 +165,7 @@ public class ClusterMetadata private final boolean lastInPeriod; private final IPartitioner partitioner; private DistributedSchema schema; + private Directory directory; private final Set<InetAddressAndPort> cmsMembers; private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions; private final Set<MetadataKey> modifiedKeys; @@ -156,6 +178,7 @@ public class ClusterMetadata this.lastInPeriod = lastInPeriod; this.partitioner = metadata.partitioner; this.schema = metadata.schema; + this.directory = metadata.directory; this.cmsMembers = new HashSet<>(metadata.cmsMembers); extensions = new HashMap<>(metadata.extensions); modifiedKeys = new HashSet<>(); @@ -167,6 +190,18 @@ public class ClusterMetadata return this; } + public Transformer register(NodeAddresses addresses, Location location, NodeVersion version) + { + directory = directory.with(addresses, location, version); + return this; + } + + public Transformer withNodeState(NodeId id, NodeState state) + { + directory = directory.withNodeState(id, state); + return this; + } + public Transformer withCMSMember(InetAddressAndPort member) { cmsMembers.add(member); @@ -229,11 +264,18 @@ public class ClusterMetadata schema = schema.withLastModified(epoch); } + if (directory != base.directory) + { + modifiedKeys.add(MetadataKeys.NODE_DIRECTORY); + directory = directory.withLastModified(epoch); + } + return new Transformed(new ClusterMetadata(epoch, period, lastInPeriod, partitioner, schema, + directory, cmsMembers, extensions), ImmutableSet.copyOf(modifiedKeys)); @@ -248,6 +290,7 @@ public class ClusterMetadata ", lastInPeriod=" + lastInPeriod + ", partitioner=" + partitioner + ", schema=" + schema + + ", directory=" + schema + ", extensions=" + extensions + ", cmsMembers=" + cmsMembers + ", modifiedKeys=" + modifiedKeys + @@ -283,13 +326,15 @@ public class ClusterMetadata ClusterMetadata that = (ClusterMetadata) o; return epoch.equals(that.epoch) && lastInPeriod == that.lastInPeriod && + schema.equals(that.schema) && + directory.equals(that.directory) && extensions.equals(that.extensions); } @Override public int hashCode() { - return Objects.hash(epoch, lastInPeriod, extensions); + return Objects.hash(epoch, lastInPeriod, schema, directory, extensions); } public static ClusterMetadata current() @@ -309,6 +354,19 @@ public class ClusterMetadata return service.metadata(); } + public NodeId myNodeId() + { + return directory.peerId(FBUtilities.getBroadcastAddressAndPort()); + } + + public NodeState myNodeState() + { + NodeId nodeId = myNodeId(); + if (myNodeId() != null) + return directory.peerState(nodeId); + return null; + } + public static class Serializer implements MetadataSerializer<ClusterMetadata> { @Override @@ -319,6 +377,7 @@ public class ClusterMetadata out.writeBoolean(metadata.lastInPeriod); out.writeUTF(metadata.partitioner.getClass().getCanonicalName()); DistributedSchema.serializer.serialize(metadata.schema, out, version); + Directory.serializer.serialize(metadata.directory, out, version); out.writeInt(metadata.extensions.size()); for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet()) { @@ -341,6 +400,7 @@ public class ClusterMetadata boolean lastInPeriod = in.readBoolean(); IPartitioner partitioner = FBUtilities.newPartitioner(in.readUTF()); DistributedSchema schema = DistributedSchema.serializer.deserialize(in, version); + Directory dir = Directory.serializer.deserialize(in, version); int items = in.readInt(); Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(items); for (int i = 0; i < items; i++) @@ -359,6 +419,7 @@ public class ClusterMetadata lastInPeriod, partitioner, schema, + dir, members, extensions); } @@ -375,7 +436,8 @@ public class ClusterMetadata VIntCoding.computeUnsignedVIntSize(metadata.period) + TypeSizes.BOOL_SIZE + sizeof(metadata.partitioner.getClass().getCanonicalName()) + - DistributedSchema.serializer.serializedSize(metadata.schema, version); + DistributedSchema.serializer.serializedSize(metadata.schema, version) + + Directory.serializer.serializedSize(metadata.directory, version); size += TypeSizes.INT_SIZE; for (InetAddressAndPort member : metadata.cmsMembers) diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java b/src/java/org/apache/cassandra/tcm/MetadataKeys.java index 6db00c2964..b545e03d4c 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java +++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java @@ -27,6 +27,7 @@ public class MetadataKeys public static final String CORE_NS = MetadataKeys.class.getPackage().getName().toLowerCase(Locale.ROOT); public static final MetadataKey SCHEMA = make(CORE_NS, "schema", "dist_schema"); + public static final MetadataKey NODE_DIRECTORY = make(CORE_NS, "membership", "node_directory"); public static final ImmutableSet<MetadataKey> CORE_METADATA = ImmutableSet.of(SCHEMA); diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index 1a9db06611..8f7afe0ebd 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -36,6 +36,7 @@ import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.AlterSchema; import org.apache.cassandra.tcm.transformations.CustomTransformation; import org.apache.cassandra.tcm.transformations.ForceSnapshot; +import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.tcm.transformations.cms.Initialize; import org.apache.cassandra.tcm.transformations.cms.PreInitialize; @@ -139,6 +140,7 @@ public interface Transformation FORCE_SNAPSHOT(() -> ForceSnapshot.serializer), SEAL_PERIOD(() -> SealPeriod.serializer), SCHEMA_CHANGE(() -> AlterSchema.serializer), + REGISTER(() -> Register.serializer), CUSTOM(() -> CustomTransformation.serializer); private final Supplier<AsymmetricMetadataSerializer<Transformation, ? extends Transformation>> serializer; diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 9ff241f72b..ad83e404ae 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -30,6 +30,7 @@ import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Period; +import org.apache.cassandra.tcm.membership.Directory; public class GossipHelper { @@ -46,6 +47,7 @@ public class GossipHelper true, DatabaseDescriptor.getPartitioner(), DistributedSchema.fromSystemTables(SchemaKeyspace.fetchNonSystemKeyspaces()), + Directory.EMPTY, Collections.emptySet(), Collections.emptyMap()); } diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java new file mode 100644 index 0000000000..d04c52dfc1 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.membership; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.UUIDSerializer; +import org.apache.cassandra.utils.btree.BTreeBiMap; +import org.apache.cassandra.utils.btree.BTreeMap; +import org.apache.cassandra.utils.btree.BTreeMultimap; + +import static org.apache.cassandra.db.TypeSizes.sizeof; + +public class Directory implements MetadataValue<Directory> +{ + public static final Serializer serializer = new Serializer(); + + public static Directory EMPTY = new Directory(); + + private final int nextId; + private final Epoch lastModified; + private final BTreeBiMap<NodeId, InetAddressAndPort> peers; + private final BTreeMap<NodeId, Location> locations; + public final BTreeMap<NodeId, NodeState> states; + public final BTreeMap<NodeId, NodeVersion> versions; + public final BTreeMap<NodeId, NodeAddresses> addresses; + private final BTreeMap<NodeId, UUID> hostIds; + private final BTreeMultimap<String, InetAddressAndPort> endpointsByDC; + private final BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC; + + public Directory() + { + this(1, + Epoch.EMPTY, + BTreeBiMap.empty(), + BTreeMap.empty(), + BTreeMap.empty(), + BTreeMap.empty(), + BTreeMap.empty(), + BTreeMap.empty(), + BTreeMultimap.empty(), + BTreeMap.empty()); + } + + private Directory(int nextId, + Epoch lastModified, + BTreeBiMap<NodeId, InetAddressAndPort> peers, + BTreeMap<NodeId, Location> locations, + BTreeMap<NodeId, NodeState> states, + BTreeMap<NodeId, NodeVersion> versions, + BTreeMap<NodeId, UUID> hostIds, + BTreeMap<NodeId, NodeAddresses> addresses, + BTreeMultimap<String, InetAddressAndPort> endpointsByDC, + BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC) + { + this.nextId = nextId; + this.lastModified = lastModified; + this.peers = peers; + this.locations = locations; + this.states = states; + this.versions = versions; + this.hostIds = hostIds; + this.addresses = addresses; + this.endpointsByDC = endpointsByDC; + this.racksByDC = racksByDC; + } + + @Override + public String toString() + { + return "Directory{" + + "nextId=" + nextId + + ", lastModified=" + lastModified + + ", peers=" + peers + + ", locations=" + locations + + ", states=" + states + + ", versions=" + versions + + ", addresses=" + addresses + + ", hostIds=" + hostIds + + ", endpointsByDC=" + endpointsByDC + + ", racksByDC=" + racksByDC + + '}'; + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + @Override + public Directory withLastModified(Epoch epoch) + { + return new Directory(nextId, epoch, peers, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC); + } + + public Directory withNonUpgradedNode(NodeAddresses addresses, + Location location, + NodeVersion version, + NodeState state, + UUID hostId) + { + NodeId id = new NodeId(new UUID(0L, nextId)); + return with(addresses, id, hostId, location, version).withNodeState(id, state).withRackAndDC(id); + } + + @VisibleForTesting + public Directory with(NodeAddresses addresses, Location location) + { + return with(addresses, location, NodeVersion.CURRENT); + } + + public Directory with(NodeAddresses addresses, Location location, NodeVersion nodeVersion) + { + // this is obviously not the right way to do this + NodeId id = new NodeId(new UUID(0L, nextId)); + return with(addresses, id, id.uuid, location, nodeVersion); + } + + private Directory with(NodeAddresses nodeAddresses, NodeId id, UUID hostId, Location location, NodeVersion nodeVersion) + { + if (peers.containsKey(id)) + return this; + if (peers.containsValue(nodeAddresses.broadcastAddress)) + return this; + if (locations.containsKey(id)) + return this; + + return new Directory(nextId + 1, + lastModified, + peers.without(id).with(id, nodeAddresses.broadcastAddress), + locations.withForce(id, location), + states.withForce(id, NodeState.REGISTERED), + versions.withForce(id, nodeVersion), + hostIds.withForce(id, hostId), + addresses.withForce(id, nodeAddresses), + endpointsByDC, + racksByDC); + } + + public Directory withNodeState(NodeId id, NodeState state) + { + return new Directory(nextId, lastModified, peers, locations, states.withForce(id, state), versions, hostIds, addresses, endpointsByDC, racksByDC); + } + + public Directory withNodeVersion(NodeId id, NodeVersion version) + { + if (Objects.equals(versions.get(id), version)) + return this; + return new Directory(nextId, lastModified, peers, locations, states, versions.withForce(id, version), hostIds, addresses, endpointsByDC, racksByDC); + } + + public Directory withNodeAddresses(NodeId id, NodeAddresses nodeAddresses) + { + if (Objects.equals(addresses.get(id), nodeAddresses)) + return this; + return new Directory(nextId, lastModified, peers, locations, states, versions, hostIds, addresses.withForce(id, nodeAddresses), endpointsByDC, racksByDC); + } + + public Directory withRackAndDC(NodeId id) + { + InetAddressAndPort endpoint = peers.get(id); + Location location = locations.get(id); + + BTreeMultimap<String, InetAddressAndPort> rackEP = (BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter); + if (rackEP == null) + rackEP = BTreeMultimap.empty(); + rackEP = rackEP.with(location.rack, endpoint); + + return new Directory(nextId, lastModified, peers, locations, states, versions, hostIds, addresses, + endpointsByDC.with(location.datacenter, endpoint), + racksByDC.withForce(location.datacenter, rackEP)); + } + + public Directory withoutRackAndDC(NodeId id) + { + InetAddressAndPort endpoint = peers.get(id); + Location location = locations.get(id); + BTreeMultimap<String, InetAddressAndPort> rackEP = (BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter); + rackEP = rackEP.without(location.rack, endpoint); + return new Directory(nextId, lastModified, peers, locations, states, versions, hostIds, addresses, + endpointsByDC.without(location.datacenter, endpoint), + racksByDC.withForce(location.datacenter, rackEP)); + } + + public Directory without(NodeId id) + { + InetAddressAndPort endpoint = peers.get(id); + Location location = locations.get(id); + BTreeMultimap<String, InetAddressAndPort> rackEP = (BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter); + rackEP = rackEP.without(location.rack, endpoint); + + return new Directory(nextId, + lastModified, + peers.without(id), + locations.without(id), + states.without(id), + versions.without(id), + hostIds.without(id), + addresses.without(id), + endpointsByDC.without(location.datacenter, endpoint), + racksByDC.withForce(location.datacenter, rackEP)); + } + + public NodeId peerId(InetAddressAndPort endpoint) + { + return peers.inverse().get(endpoint); + } + + public boolean isRegistered(InetAddressAndPort endpoint) + { + return peers.inverse().containsKey(endpoint); + } + + public InetAddressAndPort endpoint(NodeId id) + { + return peers.get(id); + } + + public boolean isEmpty() + { + return peers.isEmpty(); + } + + public ImmutableList<InetAddressAndPort> allAddresses() + { + return ImmutableList.copyOf(peers.values()); + } + + public ImmutableSet<NodeId> peerIds() + { + return ImmutableSet.copyOf(peers.keySet()); + } + + public NodeAddresses getNodeAddresses(NodeId id) + { + return addresses.get(id); + } + + private Node getNode(NodeId id) + { + return new Node(id, addresses.get(id), locations.get(id), states.get(id), versions.get(id), hostIds.get(id)); + } + + public Location location(NodeId id) + { + return locations.get(id); + } + + public Set<InetAddressAndPort> datacenterEndpoints(String datacenter) + { + return (Set<InetAddressAndPort>) endpointsByDC.get(datacenter); + } + + public Multimap<String, InetAddressAndPort> datacenterRacks(String datacenter) + { + return racksByDC.get(datacenter); + } + + public NodeState peerState(NodeId peer) + { + return states.get(peer); + } + + public NodeVersion version(NodeId peer) + { + return versions.get(peer); + } + + public UUID hostId(NodeId peer) + { + return hostIds.getOrDefault(peer, peer.uuid); + } + + public Map<String, Multimap<String, InetAddressAndPort>> allDatacenterRacks() + { + return racksByDC; + } + + public Set<String> knownDatacenters() + { + return locations.values().stream().map(l -> l.datacenter).collect(Collectors.toSet()); + } + + public Multimap<String, InetAddressAndPort> allDatacenterEndpoints() + { + return endpointsByDC; + } + + public NodeState peerState(InetAddressAndPort peer) + { + return states.get(peers.inverse().get(peer)); + } + + public String toDebugString() + { + return peers.keySet() + .stream() + .sorted() + .map(this::getNode) + .map(Node::toString) + .collect(Collectors.joining("\n")); + } + + private static class Node + { + public static final Serializer serializer = new Serializer(); + + public final NodeId id; + public final NodeAddresses addresses; + public final Location location; + public final NodeState state; + public final NodeVersion version; + public final UUID hostId; + + public Node(NodeId id, NodeAddresses addresses, Location location, NodeState state, NodeVersion version, UUID hostId) + { + this.id = Preconditions.checkNotNull(id, "Node ID must not be null"); + this.addresses = Preconditions.checkNotNull(addresses, "Node addresses must not be null"); + this.location = Preconditions.checkNotNull(location, "Node location must not be null"); + this.state = Preconditions.checkNotNull(state, "Node state must not be null"); + this.version = Preconditions.checkNotNull(version, "Node version must not be null"); + this.hostId = hostId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Node node = (Node) o; + return id.equals(node.id) + && addresses.equals(node.addresses) + && location.equals(node.location) + && state == node.state + && version.equals(node.version); + } + + + @Override + public int hashCode() + { + return Objects.hash(id, addresses, location, state, version); + } + + @Override + public String toString() + { + return "Node{" + + "id=" + id + + ", addresses=" + addresses + + ", location=" + location + + ", state=" + state + + ", version=" + version + + '}'; + } + + public static class Serializer implements MetadataSerializer<Node> + { + public void serialize(Node node, DataOutputPlus out, Version version) throws IOException + { + NodeId.serializer.serialize(node.id, out, version); + NodeAddresses.serializer.serialize(node.addresses, out, version); + out.writeUTF(node.location.datacenter); + out.writeUTF(node.location.rack); + out.writeInt(node.state.ordinal()); + NodeVersion.serializer.serialize(node.version, out, version); + if (node.hostId == null) + out.writeBoolean(false); + else + { + out.writeBoolean(true); + UUIDSerializer.serializer.serialize(node.hostId, out, MessagingService.VERSION_50); + } + + } + + public Node deserialize(DataInputPlus in, Version version) throws IOException + { + NodeId id = NodeId.serializer.deserialize(in, version); + NodeAddresses addresses = NodeAddresses.serializer.deserialize(in, version); + Location location = new Location(in.readUTF(), in.readUTF()); + NodeState state = NodeState.values()[in.readInt()]; + NodeVersion nodeVersion = NodeVersion.serializer.deserialize(in, version); + boolean hasHostId = in.readBoolean(); + UUID hostId = hasHostId ? UUIDSerializer.serializer.deserialize(in, MessagingService.VERSION_50) : null; + return new Node(id, addresses, location, state, nodeVersion, hostId); + } + + public long serializedSize(Node node, Version version) + { + long size = 0; + size += NodeId.serializer.serializedSize(node.id, version); + size += NodeAddresses.serializer.serializedSize(node.addresses, version); + size += sizeof(node.location.datacenter); + size += sizeof(node.location.rack); + size += TypeSizes.INT_SIZE; + size += NodeVersion.serializer.serializedSize(node.version, version); + size += TypeSizes.BOOL_SIZE; + if (node.hostId != null) + size += UUIDSerializer.serializer.serializedSize(node.hostId, MessagingService.VERSION_50); + return size; + } + } + } + + public static class Serializer implements MetadataSerializer<Directory> + { + public void serialize(Directory t, DataOutputPlus out, Version version) throws IOException + { + out.writeInt(t.states.size()); + for (NodeId nodeId : t.states.keySet()) + Node.serializer.serialize(t.getNode(nodeId), out, version); + + Set<String> dcs = t.racksByDC.keySet(); + out.writeInt(dcs.size()); + for (String dc : dcs) + { + out.writeUTF(dc); + Map<String, Collection<InetAddressAndPort>> racks = t.racksByDC.get(dc).asMap(); + out.writeInt(racks.size()); + for (String rack : racks.keySet()) + { + out.writeUTF(rack); + Collection<InetAddressAndPort> endpoints = racks.get(rack); + out.writeInt(endpoints.size()); + for (InetAddressAndPort endpoint : endpoints) + { + InetAddressAndPort.MetadataSerializer.serializer.serialize(endpoint, out, version); + } + } + } + Epoch.serializer.serialize(t.lastModified, out, version); + } + + public Directory deserialize(DataInputPlus in, Version version) throws IOException + { + int count = in.readInt(); + Directory newDir = new Directory(); + for (int i = 0; i < count; i++) + { + Node n = Node.serializer.deserialize(in, version); + // todo: bulk operations + newDir = newDir.with(n.addresses, n.id, n.hostId, n.location, n.version) + .withNodeState(n.id, n.state); + } + + int dcCount = in.readInt(); + BTreeMultimap<String, InetAddressAndPort> dcEndpoints = BTreeMultimap.empty(); + BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC = BTreeMap.empty(); + for (int i=0; i<dcCount; i++) + { + String dc = in.readUTF(); + int rackCount = in.readInt(); + BTreeMultimap<String, InetAddressAndPort> rackEndpoints = BTreeMultimap.empty(); + for (int j=0; j<rackCount; j++) + { + String rack = in.readUTF(); + int epCount = in.readInt(); + for (int k=0; k<epCount; k++) + { + InetAddressAndPort endpoint = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + rackEndpoints = rackEndpoints.with(rack, endpoint); + dcEndpoints = dcEndpoints.with(dc, endpoint); + } + racksByDC = racksByDC.withForce(dc, rackEndpoints); + } + } + + Epoch lastModified = Epoch.serializer.deserialize(in, version); + return new Directory(newDir.nextId, + lastModified, + newDir.peers, + newDir.locations, + newDir.states, + newDir.versions, + newDir.hostIds, + newDir.addresses, + dcEndpoints, + racksByDC); + } + + public long serializedSize(Directory t, Version version) + { + int size = sizeof(t.states.size()); + for (NodeId nodeId : t.states.keySet()) + size += Node.serializer.serializedSize(t.getNode(nodeId), version); + + size += sizeof(t.racksByDC.size()); + for (Map.Entry<String, Multimap<String, InetAddressAndPort>> entry : t.racksByDC.entrySet()) + { + size += sizeof(entry.getKey()); + Map<String, Collection<InetAddressAndPort>> racks = entry.getValue().asMap(); + size += sizeof(racks.size()); + for (Map.Entry<String, Collection<InetAddressAndPort>> e : racks.entrySet()) + { + size += sizeof(e.getKey()); + Collection<InetAddressAndPort> endpoints = e.getValue(); + size += sizeof(endpoints.size()); + for (InetAddressAndPort endpoint : endpoints) + { + size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(endpoint, version); + } + } + } + size += Epoch.serializer.serializedSize(t.lastModified, version); + return size; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Directory)) return false; + Directory directory = (Directory) o; + + return Objects.equals(lastModified, directory.lastModified) && + isEquivalent(directory); + } + + @Override + public int hashCode() + { + return Objects.hash(nextId, lastModified, peers, locations, states, endpointsByDC, racksByDC, versions, addresses); + } + + /** + * returns true if this directory is functionally equivalent to the given one + * + * does not check equality of lastModified + */ + @VisibleForTesting + public boolean isEquivalent(Directory directory) + { + return nextId == directory.nextId && + Objects.equals(peers, directory.peers) && + Objects.equals(locations, directory.locations) && + Objects.equals(states, directory.states) && + Objects.equals(endpointsByDC, directory.endpointsByDC) && + Objects.equals(racksByDC, directory.racksByDC) && + Objects.equals(versions, directory.versions) && + Objects.equals(addresses, directory.addresses); + } +} diff --git a/src/java/org/apache/cassandra/tcm/membership/Location.java b/src/java/org/apache/cassandra/tcm/membership/Location.java new file mode 100644 index 0000000000..7e38db79cb --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/membership/Location.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.membership; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +public class Location +{ + public static final Serializer serializer = new Serializer(); + + public final String datacenter; + public final String rack; + + public Location(String datacenter, String rack) + { + this.datacenter = datacenter; + this.rack = rack; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Location location = (Location) o; + return Objects.equals(datacenter, location.datacenter) && Objects.equals(rack, location.rack); + } + + @Override + public int hashCode() + { + return Objects.hash(datacenter, rack); + } + + @Override + public String toString() + { + return datacenter + '/' + rack; + } + + public static class Serializer implements MetadataSerializer<Location> + { + public void serialize(Location t, DataOutputPlus out, Version version) throws IOException + { + out.writeUTF(t.datacenter); + out.writeUTF(t.rack); + } + + public Location deserialize(DataInputPlus in, Version version) throws IOException + { + return new Location(in.readUTF(), in.readUTF()); + } + + public long serializedSize(Location t, Version version) + { + return TypeSizes.sizeof(t.datacenter) + + TypeSizes.sizeof(t.rack); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java b/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java new file mode 100644 index 0000000000..dcd7ee99f2 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/membership/NodeAddresses.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.membership; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.FBUtilities; + +public class NodeAddresses +{ + public static final Serializer serializer = new Serializer(); + + public final InetAddressAndPort broadcastAddress; + public final InetAddressAndPort localAddress; + public final InetAddressAndPort nativeAddress; + + /** + * + * @param broadcastAddress this comes from config if broadcast_address is set or it falls through to getLocalAddress + * which either grabs the local host address or listen_address from config if set + * todo; config broadcast_address can be changed by snitch (EC2MultiRegionSnitch) during runtime, handle that + * @param localAddress this is the local host if listen_address is not set in config + * @param nativeAddress address for clients to communicate with this node + */ + public NodeAddresses(InetAddressAndPort broadcastAddress, InetAddressAndPort localAddress, InetAddressAndPort nativeAddress) + { + this.broadcastAddress = broadcastAddress; + this.localAddress = localAddress; + this.nativeAddress = nativeAddress; + } + + public NodeAddresses(InetAddressAndPort address) + { + this(address, address, address); + } + + @Override + public String toString() + { + return "NodeAddresses{" + + "broadcastAddress=" + broadcastAddress + + ", localAddress=" + localAddress + + ", nativeAddress=" + nativeAddress + + '}'; + } + + public boolean conflictsWith(NodeAddresses other) + { + return broadcastAddress.equals(other.broadcastAddress) || + localAddress.equals(other.localAddress) || + nativeAddress.equals(other.nativeAddress); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof NodeAddresses)) return false; + NodeAddresses that = (NodeAddresses) o; + return Objects.equals(broadcastAddress, that.broadcastAddress) && Objects.equals(localAddress, that.localAddress) && Objects.equals(nativeAddress, that.nativeAddress); + } + + @Override + public int hashCode() + { + return Objects.hash(broadcastAddress, localAddress, nativeAddress); + } + + public static NodeAddresses current() + { + return new NodeAddresses(FBUtilities.getBroadcastAddressAndPort(), + FBUtilities.getLocalAddressAndPort(), + FBUtilities.getBroadcastNativeAddressAndPort()); + } + + public static class Serializer implements MetadataSerializer<NodeAddresses> + { + @Override + public void serialize(NodeAddresses t, DataOutputPlus out, Version version) throws IOException + { + InetAddressAndPort.MetadataSerializer.serializer.serialize(t.broadcastAddress, out, version); + InetAddressAndPort.MetadataSerializer.serializer.serialize(t.localAddress, out, version); + InetAddressAndPort.MetadataSerializer.serializer.serialize(t.nativeAddress, out, version); + } + + @Override + public NodeAddresses deserialize(DataInputPlus in, Version version) throws IOException + { + InetAddressAndPort broadcastAddress = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + InetAddressAndPort localAddress = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + InetAddressAndPort rpcAddress = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + return new NodeAddresses(broadcastAddress, localAddress, rpcAddress); + } + + @Override + public long serializedSize(NodeAddresses t, Version version) + { + return InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.broadcastAddress, version) + + InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.localAddress, version) + + InetAddressAndPort.MetadataSerializer.serializer.serializedSize(t.nativeAddress, version); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java b/src/java/org/apache/cassandra/tcm/membership/NodeId.java new file mode 100644 index 0000000000..862dacbc03 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.membership; + +import java.io.IOException; +import java.util.Objects; +import java.util.UUID; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +public class NodeId implements Comparable<NodeId> +{ + public static final Serializer serializer = new Serializer(); + + public final UUID uuid; + + public NodeId(UUID id) + { + this.uuid = id; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeId nodeId = (NodeId) o; + return Objects.equals(uuid, nodeId.uuid); + } + + @Override + public int hashCode() + { + return Objects.hash(uuid); + } + + @Override + public String toString() + { + return "NodeId{" + + "id=" + uuid + + '}'; + } + + public int compareTo(NodeId o) + { + return uuid.compareTo(o.uuid); + } + + public static class Serializer implements MetadataSerializer<NodeId> + { + public void serialize(NodeId n, DataOutputPlus out, Version version) throws IOException + { + out.writeLong(n.uuid.getMostSignificantBits()); + out.writeLong(n.uuid.getLeastSignificantBits()); + } + + public NodeId deserialize(DataInputPlus in, Version version) throws IOException + { + return new NodeId(new UUID(in.readLong(), in.readLong())); + } + + public long serializedSize(NodeId t, Version version) + { + return TypeSizes.LONG_SIZE + TypeSizes.LONG_SIZE; + } + } + +} diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java b/src/java/org/apache/cassandra/tcm/membership/NodeState.java similarity index 50% copy from src/java/org/apache/cassandra/tcm/MetadataKeys.java copy to src/java/org/apache/cassandra/tcm/membership/NodeState.java index 6db00c2964..78074145e8 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java +++ b/src/java/org/apache/cassandra/tcm/membership/NodeState.java @@ -16,30 +16,15 @@ * limitations under the License. */ -package org.apache.cassandra.tcm; - -import java.util.Locale; - -import com.google.common.collect.ImmutableSet; - -public class MetadataKeys +package org.apache.cassandra.tcm.membership; +public enum NodeState { - public static final String CORE_NS = MetadataKeys.class.getPackage().getName().toLowerCase(Locale.ROOT); - - public static final MetadataKey SCHEMA = make(CORE_NS, "schema", "dist_schema"); - - public static final ImmutableSet<MetadataKey> CORE_METADATA = ImmutableSet.of(SCHEMA); - - public static MetadataKey make(String...parts) - { - assert parts != null && parts.length >= 1; - StringBuilder b = new StringBuilder(parts[0]); - for (int i = 1; i < parts.length; i++) - { - b.append('.'); - b.append(parts[i]); - } - return new MetadataKey(b.toString()); - } - + REGISTERED, + BOOTSTRAPPING, + JOINED, + LEAVING, + LEFT, + MOVING; + + // TODO: probably we can make these states even more nuanced, and track which step each node is on to have a simpler representation of transition states } diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java new file mode 100644 index 0000000000..4234d66bce --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.membership; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.db.TypeSizes.sizeofUnsignedVInt; + +public class NodeVersion implements Comparable<NodeVersion> +{ + public static final Serializer serializer = new Serializer(); + public static final NodeVersion CURRENT = new NodeVersion(new CassandraVersion(FBUtilities.getReleaseVersionString()), Version.V0); + + public final CassandraVersion cassandraVersion; + public final Version serializationVersion; + + public NodeVersion(CassandraVersion cassandraVersion, Version serializationVersion) + { + this.cassandraVersion = cassandraVersion; + this.serializationVersion = serializationVersion; + } + + public boolean isUpgraded() + { + return serializationVersion.asInt() >= Version.V0.asInt(); + } + + @Override + public String toString() + { + return "NodeVersion{" + + "cassandraVersion=" + cassandraVersion + + ", serializationVersion=" + serializationVersion + + '}'; + } + + @Override + public int compareTo(NodeVersion o) + { + // only comparing cassandraVersion here - if we bump serializationVersion we need to release a new cassandra version + return cassandraVersion.compareTo(o.cassandraVersion); + } + + public static NodeVersion fromCassandraVersion(CassandraVersion cv) + { + if (cv == null) + return CURRENT; + Version version = Version.OLD; + if (cv.compareTo(CassandraVersion.CASSANDRA_5_0) >= 0) + version = Version.V0; + return new NodeVersion(cv, version); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof NodeVersion)) return false; + NodeVersion that = (NodeVersion) o; + return Objects.equals(cassandraVersion, that.cassandraVersion) && serializationVersion == that.serializationVersion; + } + + @Override + public int hashCode() + { + return Objects.hash(cassandraVersion, serializationVersion); + } + + public static class Serializer implements MetadataSerializer<NodeVersion> + { + @Override + public void serialize(NodeVersion t, DataOutputPlus out, Version version) throws IOException + { + out.writeUTF(t.cassandraVersion.toString()); + out.writeUnsignedVInt32(t.serializationVersion.asInt()); + } + + @Override + public NodeVersion deserialize(DataInputPlus in, Version version) throws IOException + { + CassandraVersion cassandraVersion = new CassandraVersion(in.readUTF()); + Version serializationVersion = Version.fromInt(in.readUnsignedVInt32()); + return new NodeVersion(cassandraVersion, serializationVersion); + } + + @Override + public long serializedSize(NodeVersion t, Version version) + { + return sizeof(t.cassandraVersion.toString()) + + sizeofUnsignedVInt(t.serializationVersion.asInt()); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java b/src/java/org/apache/cassandra/tcm/transformations/Register.java new file mode 100644 index 0000000000..fe1a5154c1 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.utils.FBUtilities; + +public class Register implements Transformation +{ + private static final Logger logger = LoggerFactory.getLogger(Register.class); + public static final Serializer serializer = new Serializer(); + + private final NodeAddresses addresses; + private final Location location; + private final NodeVersion version; + + public Register(NodeAddresses addresses, Location location, NodeVersion version) + { + this.location = location; + this.version = version; + this.addresses = addresses; + } + + @Override + public Kind kind() + { + return Kind.REGISTER; + } + + @Override + public Result execute(ClusterMetadata prev) + { + for (Map.Entry<NodeId, NodeAddresses> entry : prev.directory.addresses.entrySet()) + { + NodeAddresses existingAddresses = entry.getValue(); + if (addresses.conflictsWith(existingAddresses)) + return new Rejected(String.format("New addresses %s conflicts with existing node %s with addresses %s", addresses, entry.getKey(), existingAddresses)); + } + + ClusterMetadata.Transformer next = prev.transformer() + .register(addresses, location, version); + return success(next); + } + + public static NodeId maybeRegister() + { + return register(false); + } + + @VisibleForTesting + public static NodeId forceRegister() + { + return register(true); + } + + @VisibleForTesting + public static NodeId register(NodeAddresses nodeAddresses) + { + return register(nodeAddresses, NodeVersion.CURRENT); + } + + @VisibleForTesting + public static NodeId register(NodeAddresses nodeAddresses, NodeVersion nodeVersion) + { + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + Location location = new Location(snitch.getLocalDatacenter(), snitch.getLocalRack()); + + NodeId nodeId = ClusterMetadata.current().directory.peerId(nodeAddresses.broadcastAddress); + if (nodeId == null) + { + nodeId = ClusterMetadataService.instance() + .commit(new Register(nodeAddresses, location, nodeVersion), + (metadata) -> !metadata.directory.isRegistered(nodeAddresses.broadcastAddress), + (metadata) -> metadata.directory.peerId(nodeAddresses.broadcastAddress), + (metadata, reason) -> { + throw new IllegalStateException("Can't register node: " + reason); + }); + } + + logger.info("Registering with endpoint {}", nodeAddresses.broadcastAddress); + return nodeId; + } + + private static NodeId register(boolean force) + { + // Try to recover node ID from the system keyspace + UUID localHostId = SystemKeyspace.getLocalHostId(); + if (force || localHostId == null) + { + NodeId nodeId = register(NodeAddresses.current()); + localHostId = nodeId.uuid; + SystemKeyspace.setLocalHostId(localHostId); + logger.info("New node ID obtained {}, (Note: This should happen exactly once per node)", nodeId.uuid); + return nodeId; + } + else + { + Directory dir = ClusterMetadata.current().directory; + NodeId nodeId = dir.peerId(FBUtilities.getBroadcastAddressAndPort()); + NodeVersion dirVersion = dir.version(nodeId); + + // If this is a node in the process of upgrading, update the host id in the system.local table + // TODO: when constructing the initial cluster metadata for upgrade, we include a mapping from + // NodeId to the old HostId. We will need to use this lookup to map between the two for + // hint delivery immediately following an upgrade. + if (dirVersion == null || !dirVersion.isUpgraded()) + { + if (dir.hostId(nodeId).equals(localHostId)) + { + SystemKeyspace.setLocalHostId(nodeId.uuid); + logger.info("Updated local HostId from pre-upgrade version {} to the one which was pre-registered " + + "during initial cluster metadata conversion {}", localHostId, nodeId.uuid); + } + else + { + throw new RuntimeException("HostId read from local system table does not match the one recorded " + + "for this endpoint during initial cluster metadata conversion. " + + String.format("Endpoint: %s, NodeId: %s, Recorded: %s, Local: %s", + FBUtilities.getBroadcastAddressAndPort(), + nodeId, + dir.hostId(nodeId), + localHostId)); + } + } + else + { + logger.info("Local id was already registered, retaining: {}", localHostId); + } + return nodeId; + } + } + + public String toString() + { + return "Register{" + + ", addresses=" + addresses + + ", location=" + location + + ", version=" + version + + '}'; + } + + static class Serializer implements AsymmetricMetadataSerializer<Transformation, Register> + { + public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException + { + assert t instanceof Register; + Register register = (Register)t; + NodeAddresses.serializer.serialize(register.addresses, out, version); + Location.serializer.serialize(register.location, out, version); + NodeVersion.serializer.serialize(register.version, out, version); + } + + public Register deserialize(DataInputPlus in, Version version) throws IOException + { + NodeAddresses addresses = NodeAddresses.serializer.deserialize(in, version); + Location location = Location.serializer.deserialize(in, version); + NodeVersion nodeVersion = NodeVersion.serializer.deserialize(in, version); + return new Register(addresses, location, nodeVersion); + } + + public long serializedSize(Transformation t, Version version) + { + assert t instanceof Register; + Register register = (Register) t; + return NodeAddresses.serializer.serializedSize(register.addresses, version) + + Location.serializer.serializedSize(register.location, version) + + NodeVersion.serializer.serializedSize(register.version, version); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java index 766f0e8173..f4c3dd7388 100644 --- a/src/java/org/apache/cassandra/utils/CassandraVersion.java +++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java @@ -50,6 +50,7 @@ public class CassandraVersion implements Comparable<CassandraVersion> private static final Pattern PATTERN = Pattern.compile(VERSION_REGEXP); + public static final CassandraVersion CASSANDRA_5_0 = new CassandraVersion("5.0").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_4_1 = new CassandraVersion("4.1").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_4_0 = new CassandraVersion("4.0").familyLowerBound.get(); public static final CassandraVersion CASSANDRA_4_0_RC2 = new CassandraVersion(4, 0, 0, NO_HOTFIX, new String[] {"rc2"}, null); diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java new file mode 100644 index 0000000000..1e1984e635 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.btree; + +import java.util.Comparator; +import java.util.Set; + +import com.google.common.collect.BiMap; + +import static java.util.Comparator.naturalOrder; + +public class BTreeBiMap<K, V> extends AbstractBTreeMap<K, V> implements BiMap<K, V> +{ + private final Object[] inverse; + private final KeyComparator<V, K> valueComparator; + + protected static <K, V> BTreeBiMap<K, V> withComparators(Object[] tree, Object [] inverse, Comparator<K> comparator, Comparator<V> valueComparator) + { + return new BTreeBiMap<>(tree, inverse, new KeyComparator<>(comparator), new KeyComparator<>(valueComparator)); + } + + private BTreeBiMap(Object[] tree, Object [] inverse, KeyComparator<K, V> comparator, KeyComparator<V, K> valueComparator) + { + super(tree, comparator); + this.valueComparator = valueComparator; + this.inverse = inverse; + } + + public static <K, V> BTreeBiMap<K, V> empty(Comparator<K> comparator, Comparator<V> valueComparator) + { + return withComparators(BTree.empty(), BTree.empty(), comparator, valueComparator); + } + + public static <K extends Comparable<K>, V extends Comparable<V>> BTreeBiMap<K, V> empty() + { + return BTreeBiMap.<K, V>empty(naturalOrder(), naturalOrder()); + } + + @Override + public BiMap<V, K> inverse() + { + return new BTreeBiMap<>(inverse, tree, valueComparator, comparator); + } + + @Override + public BTreeBiMap<K, V> with(K key, V value) + { + if (key == null || value == null) + throw new NullPointerException(); + AbstractBTreeMap.Entry<K, V> entry = new AbstractBTreeMap.Entry<>(key, value); + AbstractBTreeMap.Entry<V, K> inverseEntry = new AbstractBTreeMap.Entry<>(value, key); + if (BTree.find(tree, comparator, entry) != null) + throw new IllegalArgumentException("Key already exists in map: " + key); + if (BTree.find(inverse, valueComparator, inverseEntry) != null) + throw new IllegalArgumentException("Value already exists in map: " + value); + + return new BTreeBiMap<>(BTree.update(tree, new Object[]{ entry }, comparator, UpdateFunction.noOp()), + BTree.update(inverse, new Object[] { new AbstractBTreeMap.Entry<>(value, key) }, valueComparator, UpdateFunction.noOp()), + comparator, + valueComparator); + } + + @Override + public BTreeBiMap<K, V> withForce(K key, V value) + { + // todo: optimise + return without(key).with(key, value); + } + + public BTreeBiMap<K, V> without(K key) + { + AbstractBTreeMap.Entry<K, V> entry = new AbstractBTreeMap.Entry<>(key, null); + AbstractBTreeMap.Entry<K, V> existingEntry = BTree.find(tree, comparator, entry); + if (existingEntry == null) + return this; + + Object[] newTree = BTreeRemoval.remove(tree, comparator, new AbstractBTreeMap.Entry<>(key, null)); + Object[] newInverse = BTreeRemoval.remove(inverse, valueComparator, new AbstractBTreeMap.Entry<>(existingEntry.getValue(), null)); + return new BTreeBiMap<>(newTree, newInverse, comparator, valueComparator); + } + + public Set<V> values() + { + return inverse().keySet(); + } +} diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java new file mode 100644 index 0000000000..aefa94a8dc --- /dev/null +++ b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.btree; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import static java.util.Comparator.naturalOrder; + +public class BTreeMultimap<K, V> implements Multimap<K, V> +{ + private final BTreeMap<K, Collection<V>> map; + private final Comparator<K> comparator; + private final Comparator<V> valueComparator; + private final int size; + + private BTreeMultimap(BTreeMap<K, Collection<V>> map, Comparator<K> comparator, Comparator<V> valueComparator, int size) + { + this.map = map; + this.comparator = comparator; + this.valueComparator = valueComparator; + this.size = size; + } + + public static <K extends Comparable<K>, V extends Comparable<V>> BTreeMultimap<K, V> empty() + { + return new BTreeMultimap<K, V>(BTreeMap.empty(), naturalOrder(), naturalOrder(), 0); + } + + public BTreeMultimap<K, V> with(K key, V value) + { + if (map.containsKey(key)) + { + BTreeSet<V> oldSet = (BTreeSet<V>) map.get(key); + BTreeSet<V> newSet = oldSet.with(value); + int newSize = size + newSet.size() - oldSet.size(); + return new BTreeMultimap<>(map.without(key).with(key, newSet), comparator, valueComparator, newSize); + } + else + { + BTreeSet<V> newSet = BTreeSet.of(valueComparator, value); + return new BTreeMultimap<>(map.with(key, newSet), comparator, valueComparator, size + 1); + } + } + + public BTreeMultimap<K, V> without(K key) + { + Collection<V> oldSet = map.get(key); + if (oldSet == null) + return this; + int newSize = size - oldSet.size(); + return new BTreeMultimap<>(map.without(key), comparator, valueComparator, newSize); + } + + public BTreeMultimap<K, V> without(K key, V value) + { + BTreeSet<V> values = (BTreeSet<V>) map.get(key); + if (values == null) + return this; + if (!values.contains(value)) + return this; + BTreeSet<V> newValues = BTreeSet.wrap(BTreeRemoval.remove(values.tree, valueComparator, value), valueComparator); + BTreeMap<K, Collection<V>> newMap = map.without(key); + if (newValues.isEmpty()) + return new BTreeMultimap<>(newMap, comparator, valueComparator, size - 1); + + return new BTreeMultimap<>(newMap.with(key, newValues), comparator, valueComparator, size - 1); + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isEmpty() + { + return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object o) + { + if (o == null) + return false; + return map.containsKey(o); + } + + @Override + public boolean containsValue(@Nullable Object o) + { + if (o == null) + return false; + for (Map.Entry<K, Collection<V>> e : map.entrySet()) + if (e.getValue().contains(o)) + return true; + return false; + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { + if (key == null || value == null) + throw new NullPointerException(); + return map.containsKey(key) && map.get(key).contains(value); + } + + @Override + public Collection<V> get(@Nullable K k) + { + if (k == null) + return null; + return map.get(k); + } + + @Override + public Set<K> keySet() + { + return map.keySet(); + } + + @Override + public Multiset<K> keys() + { + ImmutableMultiset.Builder<K> keys = ImmutableMultiset.builder(); + keys.addAll(map.keySet()); + return keys.build(); + } + + @Override + public Collection<V> values() + { + ImmutableList.Builder<V> builder = ImmutableList.builder(); + for (Map.Entry<K, Collection<V>> entry : map.entrySet()) + builder.addAll(entry.getValue()); + return builder.build(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { + Set<Map.Entry<K, V>> entries = new HashSet<>(); + for (Map.Entry<K, Collection<V>> entry : map.entrySet()) + for (V v : entry.getValue()) + entries.add(new AbstractBTreeMap.Entry<>(entry.getKey(), v)); + return entries; + } + + @Override + public Map<K, Collection<V>> asMap() + { + return map; + } + + public boolean put(@Nullable K k, @Nullable V v) { throw new UnsupportedOperationException();} + public boolean remove(@Nullable Object o, @Nullable Object o1) {throw new UnsupportedOperationException();} + public boolean putAll(@Nullable K k, Iterable<? extends V> iterable) {throw new UnsupportedOperationException();} + public boolean putAll(Multimap<? extends K, ? extends V> multimap) {throw new UnsupportedOperationException();} + public Collection<V> replaceValues(@Nullable K k, Iterable<? extends V> iterable) {throw new UnsupportedOperationException();} + public Collection<V> removeAll(@Nullable Object o) {throw new UnsupportedOperationException();} + public void clear() { throw new UnsupportedOperationException(); } + + @Override + public String toString() + { + return map.toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof BTreeMultimap)) return false; + BTreeMultimap<?, ?> that = (BTreeMultimap<?, ?>) o; + return size == that.size && + Objects.equals(map, that.map) && + Objects.equals(comparator, that.comparator) && + Objects.equals(valueComparator, that.valueComparator); + } + + @Override + public int hashCode() + { + return Objects.hash(map, comparator, valueComparator, size); + } +} diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java index b20729d7da..5bb3ac0446 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java @@ -18,7 +18,18 @@ */ package org.apache.cassandra.utils.btree; -import java.util.*; +import java.util.AbstractSet; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.ListIterator; +import java.util.NavigableSet; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.SortedSet; +import java.util.Spliterator; +import java.util.Spliterators; import com.google.common.collect.Ordering; @@ -27,7 +38,7 @@ import org.apache.cassandra.utils.btree.BTree.Dir; import static org.apache.cassandra.utils.btree.BTree.findIndex; -public class BTreeSet<V> implements NavigableSet<V>, List<V> +public class BTreeSet<V> extends AbstractSet<V> implements NavigableSet<V>, List<V> { protected final Comparator<? super V> comparator; protected final Object[] tree; @@ -175,12 +186,16 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V> @Override public V first() { + if (isEmpty()) + throw new NoSuchElementException(); return get(0); } @Override public V last() { + if (isEmpty()) + throw new NoSuchElementException(); return get(size() - 1); } @@ -223,7 +238,6 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V> return false; return true; } - public int hashCode() { // we can't just delegate to Arrays.deepHashCode(), @@ -234,6 +248,7 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V> return result; } + @Override public boolean addAll(Collection<? extends V> c) { @@ -590,24 +605,24 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V> public Builder<V> add(V v) { - wrapped .add(v); + wrapped.add(v); return this; } public Builder<V> addAll(Collection<V> iter) { - wrapped .addAll(iter); + wrapped.addAll(iter); return this; } public boolean isEmpty() { - return wrapped .isEmpty(); + return wrapped.isEmpty(); } public BTreeSet<V> build() { - return new BTreeSet<>(wrapped .build(), wrapped .comparator); + return new BTreeSet<>(wrapped.build(), wrapped.comparator); } } @@ -627,6 +642,22 @@ public class BTreeSet<V> implements NavigableSet<V>, List<V> return new BTreeSet<>(btree, comparator); } + public BTreeSet<V> with(Collection<V> updateWith) + { + Object[] with = BTreeSet.<V>builder(comparator).addAll(updateWith).build().tree; + return new BTreeSet<>(BTree.update(tree, with, comparator, UpdateFunction.<V>noOp()), comparator); + } + + public BTreeSet<V> with(V updateWith) + { + return new BTreeSet<>(BTree.update(tree, new Object[] { updateWith }, comparator, UpdateFunction.<V>noOp()), comparator); + } + + public BTreeSet<V> without(V element) + { + return new BTreeSet<>(BTreeRemoval.remove(tree, comparator, element), comparator); + } + public static <V extends Comparable<V>> BTreeSet<V> of(Collection<V> sortedValues) { return new BTreeSet<>(BTree.build(sortedValues), Ordering.<V>natural()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
