This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit bde3d39daf1d626de5bbef6409fe2ffe1ff1ec69 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Fri Mar 3 17:40:14 2023 +0000 [CEP-21] Upgrade support Following an upgrade, nodes in an existing cluster will enter a minimal modification mode. In this state, the set of allowed cluster metadata modifications is constrained to include only the addition, removal and replacement of nodes, to allow failed hosts to be replaced during the upgrade. In this mode the CMS has no members and each peer maintains its own ClusterMetadata independently. This metadata is intitialised at startup from system tables and gossip is used to propagate the permitted metadata changes. When the operator is ready, one node is chosen for promotion to the initial CMS, which is done manually via nodetool. At this point, the candidate node will propose itself as the initial CMS and attempt to gain consensus from the rest of the cluster. If successful, it verifies that all peers have an identical view of cluster metadata and initialises the distributed log with a snapshot of that metadata. Once this process is complete all future cluster metadata updates are performed via the CMS using the global log and reverting to the previous method of metadata management is not supported. Further members can and should be added to the CMS via the nodetool command. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../db/commitlog/CommitLogDescriptor.java | 5 +- .../apache/cassandra/db/virtual/PeersTable.java | 187 ++++++++++++++++++ .../cassandra/db/virtual/SystemViewsKeyspace.java | 1 + .../apache/cassandra/hints/HintsDescriptor.java | 5 +- .../cassandra/locator/InetAddressAndPort.java | 14 +- src/java/org/apache/cassandra/net/InboundSink.java | 28 ++- .../apache/cassandra/net/ResponseVerbHandler.java | 2 + src/java/org/apache/cassandra/net/Verb.java | 11 +- .../apache/cassandra/service/StorageService.java | 2 +- .../cassandra/tcm/ClusterMetadataService.java | 121 +++++++++++- src/java/org/apache/cassandra/tcm/Commit.java | 14 +- src/java/org/apache/cassandra/tcm/Discovery.java | 1 + src/java/org/apache/cassandra/tcm/Startup.java | 116 ++++++++++- .../tcm/listeners/LegacyStateListener.java | 91 +++++++++ .../org/apache/cassandra/tcm/log/LocalLog.java | 28 ++- .../tcm/migration/ClusterMetadataHolder.java | 66 +++++++ .../apache/cassandra/tcm/migration/Election.java | 212 ++++++++++++++++++++- .../cassandra/tcm/migration/GossipCMSListener.java | 7 +- .../cassandra/tcm/migration/GossipProcessor.java | 41 ++++ src/java/org/apache/cassandra/tools/NodeTool.java | 1 + .../apache/cassandra/tools/nodetool/AddToCMS.java | 39 ++++ 21 files changed, 954 insertions(+), 38 deletions(-) diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java index ed2af1bd0d..41444fecde 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java @@ -62,13 +62,14 @@ public class CommitLogDescriptor // We don't support anything pre-3.0 public static final int VERSION_30 = 6; public static final int VERSION_40 = 7; + public static final int VERSION_50 = 8; /** * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. * Note: make sure to handle {@link #getMessagingVersion()} */ @VisibleForTesting - public static final int current_version = VERSION_40; + public static final int current_version = VERSION_50; final int version; public final long id; @@ -222,6 +223,8 @@ public class CommitLogDescriptor return MessagingService.VERSION_30; case VERSION_40: return MessagingService.VERSION_40; + case VERSION_50: + return MessagingService.VERSION_50; default: throw new IllegalStateException("Unknown commitlog version " + version); } diff --git a/src/java/org/apache/cassandra/db/virtual/PeersTable.java b/src/java/org/apache/cassandra/db/virtual/PeersTable.java new file mode 100644 index 0000000000..157bed2f60 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/PeersTable.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.InetAddressType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS; +import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2; +import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME; + +public class PeersTable extends AbstractVirtualTable +{ + public static String PEER = "peer"; + public static String PEER_PORT = "peer_port"; + public static String DATA_CENTER = "data_center"; + public static String HOST_ID = "host_id"; + public static String PREFERRED_IP = "preferred_ip"; + public static String PREFERRED_PORT = "preferred_port"; + public static String RACK = "rack"; + public static String RELEASE_VERSION = "release_version"; + public static String NATIVE_ADDRESS = "native_address"; + public static String NATIVE_PORT = "native_port"; + public static String SCHEMA_VERSION = "schema_version"; + public static String TOKENS = "tokens"; + public static String STATE = "state"; + + public PeersTable(String keyspace) + { + super(TableMetadata.builder(keyspace, "peers") + .comment("Peers") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(InetAddressType.instance)) + .addPartitionKeyColumn(PEER, InetAddressType.instance) + .addClusteringColumn(PEER_PORT, Int32Type.instance) + .addRegularColumn(DATA_CENTER, UTF8Type.instance) + .addRegularColumn(RACK, UTF8Type.instance) + .addRegularColumn(HOST_ID, UUIDType.instance) + .addRegularColumn(PREFERRED_IP, InetAddressType.instance) + .addRegularColumn(PREFERRED_PORT, Int32Type.instance) + .addRegularColumn(NATIVE_ADDRESS, InetAddressType.instance) + .addRegularColumn(NATIVE_PORT, Int32Type.instance) + .addRegularColumn(RELEASE_VERSION, UTF8Type.instance) + .addRegularColumn(SCHEMA_VERSION, UUIDType.instance) + .addRegularColumn(STATE, UTF8Type.instance) + .addRegularColumn(TOKENS, SetType.getInstance(UTF8Type.instance, false)) + .build()); + } + + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + ClusterMetadata metadata = ClusterMetadata.current(); + for (InetAddressAndPort addr : metadata.directory.allAddresses()) + { + NodeId peer = metadata.directory.peerId(addr); + + NodeAddresses addresses = metadata.directory.getNodeAddresses(peer); + result.row(addr.getAddress(), addr.getPort()) + .column(DATA_CENTER, metadata.directory.location(peer).datacenter) + .column(RACK, metadata.directory.location(peer).rack) + .column(HOST_ID, peer.uuid) + .column(PREFERRED_IP, addresses.broadcastAddress.getAddress()) + .column(PREFERRED_PORT, addresses.broadcastAddress.getPort()) + .column(NATIVE_ADDRESS, addresses.nativeAddress.getAddress()) + .column(NATIVE_PORT, addresses.nativeAddress.getPort()) + .column(RELEASE_VERSION, metadata.directory.version(peer).cassandraVersion.toString()) + .column(SCHEMA_VERSION, Schema.instance.getVersion()) //TODO + .column(STATE, metadata.directory.peerState(peer).toString()) + .column(TOKENS, new HashSet<>(metadata.tokenMap.tokens(peer).stream().map((token) -> token.getToken().getTokenValue().toString()).collect(Collectors.toList()))); + } + + return result; + } + + public static void initializeLegacyPeerTables(ClusterMetadata prev, ClusterMetadata next) + { + QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, PEERS_V2)); + QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", SYSTEM_KEYSPACE_NAME, LEGACY_PEERS)); + + for (NodeId nodeId : next.directory.peerIds()) + updateLegacyPeerTable(nodeId, prev, next); + } + + private static String peers_v2_query = "INSERT INTO %s.%s (" + + "peer, peer_port, " + + "preferred_ip, preferred_port, " + + "native_address, native_port, " + + "data_center, rack, " + + "host_id, " + + "release_version, " + + "schema_version," + + "tokens) " + + "VALUES " + + "(?,?,?,?,?,?,?,?,?,?,?,?)"; + + private static String legacy_peers_query = "INSERT INTO %s.%s (" + + "peer, preferred_ip, rpc_address, " + + "data_center, rack, " + + "host_id, " + + "release_version, " + + "schema_version," + + "tokens) " + + "VALUES " + + "(?,?,?,?,?,?,?,?,?)"; + + private static String peers_delete_query = "DELETE FROM %s.%s WHERE peer=? and peer_port=?"; + private static String legacy_peers_delete_query = "DELETE FROM %s.%s WHERE peer=?"; + + private static final Logger logger = LoggerFactory.getLogger(PeersTable.class); + public static void updateLegacyPeerTable(NodeId nodeId, ClusterMetadata prev, ClusterMetadata next) + { + if (nodeId.equals(next.directory.peerId(FBUtilities.getBroadcastAddressAndPort()))) + return; + + if (next.directory.peerState(nodeId) == null || next.directory.peerState(nodeId) == NodeState.LEFT) + { + NodeAddresses addresses = prev.directory.getNodeAddresses(nodeId); + logger.debug("Purging {} from system.peers_v2 table", addresses); + QueryProcessor.executeInternal(String.format(peers_delete_query, SYSTEM_KEYSPACE_NAME, PEERS_V2), addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort()); + QueryProcessor.executeInternal(String.format(legacy_peers_delete_query, SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), addresses.broadcastAddress.getAddress()); + } + else + { + NodeAddresses addresses = next.directory.getNodeAddresses(nodeId); + Location location = next.directory.location(nodeId); + + Set<String> tokens = SystemKeyspace.tokensAsSet(next.tokenMap.tokens(nodeId)); + QueryProcessor.executeInternal(String.format(peers_v2_query, SYSTEM_KEYSPACE_NAME, PEERS_V2), + addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort(), + addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort(), + addresses.nativeAddress.getAddress(), addresses.nativeAddress.getPort(), + location.datacenter, location.rack, + nodeId.uuid, + next.directory.version(nodeId).cassandraVersion.toString(), + next.schema.getVersion(), + tokens); + + QueryProcessor.executeInternal(String.format(legacy_peers_query, SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), + addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getAddress(), addresses.nativeAddress.getAddress(), + location.datacenter, location.rack, + nodeId.uuid, + next.directory.version(nodeId).cassandraVersion.toString(), + next.schema.getVersion(), + tokens); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 7ebed45ac2..a49dbea7cb 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -50,6 +50,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace .add(new QueriesTable(VIRTUAL_VIEWS)) .add(new LogMessagesTable(VIRTUAL_VIEWS)) .add(new SnapshotsTable(VIRTUAL_VIEWS)) + .add(new PeersTable(VIRTUAL_VIEWS)) .add(new LocalTable(VIRTUAL_VIEWS)) .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS)) .build()); diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java b/src/java/org/apache/cassandra/hints/HintsDescriptor.java index 8e1f782f1d..b915cf6bda 100644 --- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java +++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java @@ -66,7 +66,8 @@ final class HintsDescriptor static final int VERSION_30 = 1; static final int VERSION_40 = 2; - static final int CURRENT_VERSION = VERSION_40; + static final int VERSION_50 = 3; + static final int CURRENT_VERSION = VERSION_50; static final String COMPRESSION = "compression"; static final String ENCRYPTION = "encryption"; @@ -232,6 +233,8 @@ final class HintsDescriptor return MessagingService.VERSION_30; case VERSION_40: return MessagingService.VERSION_40; + case VERSION_50: + return MessagingService.VERSION_50; default: throw new AssertionError(); } diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java index 65f646842a..298d7e76cf 100644 --- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java +++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java @@ -37,8 +37,8 @@ import com.google.common.net.HostAndPort; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FastByteOperations; @@ -221,6 +221,17 @@ public final class InetAddressAndPort extends InetSocketAddress implements Compa return getByNameOverrideDefaults(name, null); } + public static InetAddressAndPort getByNameUnchecked(String name) + { + try + { + return getByNameOverrideDefaults(name, null); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } + } public static List<InetAddressAndPort> getAllByName(String name) throws UnknownHostException { @@ -338,7 +349,6 @@ public final class InetAddressAndPort extends InetSocketAddress implements Compa return Serializer.inetAddressAndPortSerializer.serializedSize(t, SERDE_VERSION); } } - /** * As of version 4.0 the endpoint description includes a port number as an unsigned short * This serializer matches the 3.0 CompactEndpointSerializationHelper, encoding the number of address bytes diff --git a/src/java/org/apache/cassandra/net/InboundSink.java b/src/java/org/apache/cassandra/net/InboundSink.java index 16eb440540..b3c49c89da 100644 --- a/src/java/org/apache/cassandra/net/InboundSink.java +++ b/src/java/org/apache/cassandra/net/InboundSink.java @@ -18,6 +18,7 @@ package org.apache.cassandra.net; import java.io.IOException; +import java.util.EnumSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Predicate; @@ -29,6 +30,9 @@ import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.index.IndexNotAvailableException; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.NotCMSException; import org.apache.cassandra.utils.NoSpamLogger; /** @@ -72,10 +76,28 @@ public class InboundSink implements InboundMessageHandlers.MessageConsumer private final MessagingService messaging; + private final static EnumSet<Verb> allowedDuringStartup = EnumSet.of(Verb.GOSSIP_DIGEST_ACK, Verb.GOSSIP_DIGEST_SYN); + InboundSink(MessagingService messaging) { this.messaging = messaging; - this.sink = message -> message.header.verb.handler().doVerb((Message<Object>) message); + this.sink = message -> { + IVerbHandler handler = message.header.verb.handler(); + if (handler == null) + { + String err = String.format("Handler for verb %s is null", message.header.verb); + noSpamLogger.info(err); + throw new IllegalStateException(err); + } + + if (ClusterMetadata.current().epoch.is(Epoch.UPGRADE_STARTUP) && !allowedDuringStartup.contains(message.header.verb)) + { + noSpamLogger.info("Ignoring message from {} with verb="+message.header.verb, message.from()); + return; + } + + handler.doVerb(message); + }; } public void fail(Message.Header header, Throwable failure) @@ -100,7 +122,9 @@ public class InboundSink implements InboundMessageHandlers.MessageConsumer { fail(message.header, t); - if (t instanceof TombstoneOverwhelmingException || t instanceof IndexNotAvailableException) + if (t instanceof NotCMSException) + noSpamLogger.warn(t.getMessage()); + else if (t instanceof TombstoneOverwhelmingException || t instanceof IndexNotAvailableException) noSpamLogger.error(t.getMessage()); else if (t instanceof RuntimeException) throw (RuntimeException) t; diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 27235888ad..3b4a1c69d4 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -42,6 +42,8 @@ class ResponseVerbHandler implements IVerbHandler message.verb() != Verb.TCM_REPLICATION && message.verb() != Verb.TCM_NOTIFY_RSP && message.verb() != Verb.TCM_DISCOVER_RSP && + message.verb() != Verb.TCM_INIT_MIG_RSP && + message.verb() != Verb.TCM_INIT_MIG_RSP && // Gossip stage is single-threaded, so we may end up in a deadlock with after-commit hook // that executes something on the gossip stage as well. !Stage.GOSSIP.executor().inExecutor()) diff --git a/src/java/org/apache/cassandra/net/Verb.java b/src/java/org/apache/cassandra/net/Verb.java index ab3504bdd9..0cec5f1aa3 100644 --- a/src/java/org/apache/cassandra/net/Verb.java +++ b/src/java/org/apache/cassandra/net/Verb.java @@ -87,7 +87,7 @@ import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse; import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete; import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup; import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup; -import org.apache.cassandra.tcm.Commit.Result;import org.apache.cassandra.tcm.Discovery;import org.apache.cassandra.tcm.Replay;import org.apache.cassandra.tcm.log.LogState;import org.apache.cassandra.tcm.log.Replication;import org.apache.cassandra.utils.BooleanSerializer; +import org.apache.cassandra.tcm.Commit.Result;import org.apache.cassandra.tcm.Discovery;import org.apache.cassandra.tcm.Replay;import org.apache.cassandra.tcm.log.LogState;import org.apache.cassandra.tcm.log.Replication;import org.apache.cassandra.tcm.migration.ClusterMetadataHolder;import org.apache.cassandra.tcm.migration.Election;import org.apache.cassandra.utils.BooleanSerializer; import org.apache.cassandra.service.EchoVerbHandler; import org.apache.cassandra.service.SnapshotVerbHandler; import org.apache.cassandra.service.paxos.Commit; @@ -211,9 +211,12 @@ public enum Verb TCM_REPLICATION (805, P1, rpcTimeout, INTERNAL_METADATA, () -> Replication.messageSerializer, () -> replicationHandler() ), TCM_NOTIFY_RSP (806, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> ResponseVerbHandler.instance ), TCM_NOTIFY_REQ (807, P1, rpcTimeout, INTERNAL_METADATA, () -> LogState.serializer, () -> logNotifyHandler(), TCM_NOTIFY_RSP ), - TCM_CURRENT_EPOCH_REQ (809, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ), - TCM_DISCOVER_RSP (815, P1, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ), - TCM_DISCOVER_REQ (816, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), + TCM_CURRENT_EPOCH_REQ (808, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> currentEpochRequestHandler(), TCM_NOTIFY_RSP ), + TCM_INIT_MIG_RSP (809, P1, rpcTimeout, INTERNAL_METADATA, () -> ClusterMetadataHolder.serializer, () -> ResponseVerbHandler.instance ), + TCM_INIT_MIG_REQ (810, P1, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.prepareHandler, TCM_INIT_MIG_RSP ), + TCM_ABORT_MIG (811, P1, rpcTimeout, INTERNAL_METADATA, () -> Election.Initiator.serializer, () -> Election.instance.abortHandler, TCM_INIT_MIG_RSP ), + TCM_DISCOVER_RSP (812, P1, rpcTimeout, INTERNAL_METADATA, () -> Discovery.serializer, () -> ResponseVerbHandler.instance ), + TCM_DISCOVER_REQ (813, P1, rpcTimeout, INTERNAL_METADATA, () -> NoPayload.serializer, () -> Discovery.instance.requestHandler, TCM_DISCOVER_RSP ), // generic failure response FAILURE_RSP (99, P0, noTimeout, REQUEST_RESPONSE, () -> RequestFailureReason.serializer, () -> ResponseVerbHandler.instance ), diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index b6090bfd2d..b05511c494 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -216,10 +216,10 @@ import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Startup; import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.transformations.Register; +import org.apache.cassandra.tcm.transformations.Startup; import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.net.AsyncOneResponse; import org.apache.cassandra.net.MessagingService; diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 4cf0db438a..be827cc9d6 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -20,6 +20,8 @@ package org.apache.cassandra.tcm; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; @@ -27,6 +29,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,13 +41,20 @@ import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.Replication; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.migration.Election; +import org.apache.cassandra.tcm.migration.GossipProcessor; import org.apache.cassandra.tcm.ownership.PlacementProvider; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.sequences.AddToCMS; import org.apache.cassandra.tcm.transformations.SealPeriod; import org.apache.cassandra.utils.FBUtilities; +import static java.util.stream.Collectors.toSet; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; +import static org.apache.cassandra.utils.Collectors3.toImmutableSet; public class ClusterMetadataService @@ -111,8 +121,11 @@ public class ClusterMetadataService return state(ClusterMetadata.current()); } - public static State state(ClusterMetadata clusterMetadata) + public static State state(ClusterMetadata metadata) { + if (metadata.epoch.isBefore(Epoch.EMPTY)) + return State.GOSSIP; + // The node is a full member of the CMS if it has started participating in reads for distributed metadata table (which // implies it is a write replica as well). In other words, it's a fully joined member of the replica set responsible for // the distributed metadata table. @@ -131,12 +144,15 @@ public class ClusterMetadataService log = LocalLog.async(initial); Processor localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); + RemoteProcessor remoteProcessor = new RemoteProcessor(log, Discovery.instance::discoveredNodes); + GossipProcessor gossipProcessor = new GossipProcessor(); replicator = new Commit.DefaultReplicator(() -> log.metadata().directory); currentEpochHandler = new CurrentEpochRequestHandler(); replayRequestHandler = new SwitchableHandler<>(new Replay.Handler(), cmsStateSupplier); commitRequestHandler = new SwitchableHandler<>(new Commit.Handler(localProcessor, replicator), cmsStateSupplier); processor = new SwitchableProcessor(localProcessor, - new RemoteProcessor(log, Discovery.instance::discoveredNodes), + remoteProcessor, + gossipProcessor, cmsStateSupplier); replicationHandler = new Replication.ReplicationHandler(log); @@ -218,14 +234,92 @@ public class ClusterMetadataService public void addToCms(List<String> ignoredEndpoints) { + Set<InetAddressAndPort> ignored = ignoredEndpoints.stream().map(InetAddressAndPort::getByNameUnchecked).collect(toSet()); + if (ignored.contains(FBUtilities.getBroadcastAddressAndPort())) + { + String msg = "Can't ignore local host " + FBUtilities.getBroadcastAddressAndPort() + " when doing CMS migration"; + logger.error(msg); + throw new IllegalStateException(msg); + } + ClusterMetadata metadata = metadata(); - if (metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort())) + Set<InetAddressAndPort> existingMembers = metadata.fullCMSMembers(); + if (existingMembers.contains(FBUtilities.getBroadcastAddressAndPort())) { logger.info("Already in the CMS"); throw new IllegalStateException("Already in the CMS"); } - // TODO + if (!metadata.directory.allAddresses().containsAll(ignored)) + { + Set<InetAddressAndPort> allAddresses = Sets.newHashSet(metadata.directory.allAddresses()); + String msg = String.format("Ignored host(s) %s don't exist in the cluster", Sets.difference(ignored, allAddresses)); + logger.error(msg); + throw new IllegalStateException(msg); + } + + for (Map.Entry<NodeId, NodeVersion> entry : metadata.directory.versions.entrySet()) + { + NodeVersion version = entry.getValue(); + InetAddressAndPort ep = metadata.directory.getNodeAddresses(entry.getKey()).broadcastAddress; + if (ignored.contains(ep)) + { + // todo; what do we do if an endpoint has a mismatching gossip-clustermetadata? + // - we could add the node to --ignore and force this CM to it? + // - require operator to bounce/manually fix the CM on that node + // for now just requiring that any ignored host is also down +// if (FailureDetector.instance.isAlive(ep)) +// throw new IllegalStateException("Can't ignore " + ep + " during CMS migration - it is not down"); + logger.info("Endpoint {} running {} is ignored", ep, version); + continue; + } + + if (!version.isUpgraded()) + { + String msg = String.format("All nodes are not yet upgraded - %s is running %s", metadata.directory.endpoint(entry.getKey()), version); + logger.error(msg); + throw new IllegalStateException(msg); + } + } + + if (existingMembers.isEmpty()) + { + logger.info("First CMS node"); + Set<InetAddressAndPort> candidates = metadata + .directory + .allAddresses() + .stream() + .filter(ep -> !FBUtilities.getBroadcastAddressAndPort().equals(ep) && + !ignoredEndpoints.contains(ep)) + .collect(toImmutableSet()); + + Election.instance.nominateSelf(candidates, ignored, metadata::equals); + ClusterMetadataService.instance().sealPeriod(); + } + else + { + logger.info("Adding local node to existing CMS nodes; {}", existingMembers); + AddToCMS.initiate(); + } + } + + public boolean applyFromGossip(ClusterMetadata expected, ClusterMetadata updated) + { + logger.debug("Applying from gossip, current={} new={}", expected, updated); + if (!expected.epoch.isBefore(Epoch.EMPTY)) + throw new IllegalStateException("Can't apply a ClusterMetadata from gossip with epoch " + expected.epoch); + if (state() != State.GOSSIP) + throw new IllegalStateException("Can't apply a ClusterMetadata from gossip when CMSState is not GOSSIP: " + state()); + + return log.unsafeSetCommittedFromGossip(expected, updated); + } + + public void setFromGossip(ClusterMetadata fromGossip) + { + logger.debug("Setting from gossip, new={}", fromGossip); + if (state() != State.GOSSIP) + throw new IllegalStateException("Can't apply a ClusterMetadata from gossip when CMSState is not GOSSIP: " + state()); + log.unsafeSetCommittedFromGossip(fromGossip); } public final Supplier<Entry.Id> entryIdGen = new Entry.DefaultEntryIdGen(); @@ -353,6 +447,12 @@ public class ClusterMetadataService Epoch ourEpoch = ClusterMetadata.current().epoch; if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch)) { + if (state() == State.GOSSIP) + { + logger.warn("TODO: can't catchup in gossip mode (their epoch = {})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are probably racing with migration, or we missed the finish migration message, handle! + return; + } + replayAndWait(); ourEpoch = ClusterMetadata.current().epoch; if (theirEpoch.isAfter(ourEpoch)) @@ -396,8 +496,7 @@ public class ClusterMetadataService public boolean isMigrating() { - return false; -// return Election.instance.isMigrating(); + return Election.instance.isMigrating(); } /** @@ -426,6 +525,10 @@ public class ClusterMetadataService break; case REMOTE: throw new NotCMSException("Not currently a member of the CMS"); + case GOSSIP: + String msg = "Tried to use a handler when in gossip mode: "+handler.toString(); + logger.error(msg); + throw new IllegalStateException(msg); default: throw new IllegalStateException("Illegal state: " + cmsStateSupplier.get()); } @@ -437,12 +540,14 @@ public class ClusterMetadataService { private final Processor local; private final RemoteProcessor remote; + private final GossipProcessor gossip; private final Supplier<State> cmsStateSupplier; - SwitchableProcessor(Processor local, RemoteProcessor remote, Supplier<State> cmsStateSupplier) + SwitchableProcessor(Processor local, RemoteProcessor remote, GossipProcessor gossip, Supplier<State> cmsStateSupplier) { this.local = local; this.remote = remote; + this.gossip = gossip; this.cmsStateSupplier = cmsStateSupplier; } @@ -456,6 +561,8 @@ public class ClusterMetadataService return local; case REMOTE: return remote; + case GOSSIP: + return gossip; } throw new IllegalStateException("Bad CMS state: " + state); } diff --git a/src/java/org/apache/cassandra/tcm/Commit.java b/src/java/org/apache/cassandra/tcm/Commit.java index 54e6d6ab0c..e4548085ee 100644 --- a/src/java/org/apache/cassandra/tcm/Commit.java +++ b/src/java/org/apache/cassandra/tcm/Commit.java @@ -31,15 +31,12 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.IVerbHandler; -import org.apache.cassandra.net.Message; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.Verb; -import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.Replication; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.net.*; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.NodeId; -import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.vint.VIntCoding; @@ -323,8 +320,11 @@ public class Commit for (NodeId peerId : directory.peerIds()) { InetAddressAndPort endpoint = directory.endpoint(peerId); + boolean upgraded = directory.version(peerId).isUpgraded(); // Do not replicate to self and to the peer that has requested to commit this message - if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || (source != null && source.equals(endpoint))) + if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) || + (source != null && source.equals(endpoint)) || + !upgraded) { continue; } diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java b/src/java/org/apache/cassandra/tcm/Discovery.java index 58efbbccc6..9320e9bffb 100644 --- a/src/java/org/apache/cassandra/tcm/Discovery.java +++ b/src/java/org/apache/cassandra/tcm/Discovery.java @@ -123,6 +123,7 @@ public class Discovery public void doVerb(Message<NoPayload> message) { Set<InetAddressAndPort> cms = ClusterMetadata.current().fullCMSMembers(); + logger.debug("Responding to discovery request from {}: {}", message.from(), cms); DiscoveredNodes discoveredNodes; if (!cms.isEmpty()) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 6d5c8d037b..488df2e00f 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -18,22 +18,37 @@ package org.apache.cassandra.tcm; + import java.io.IOException; + import java.util.Collections; + import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; + import java.util.concurrent.TimeUnit; import java.util.function.Function; + import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.SystemKeyspace; + import org.apache.cassandra.db.commitlog.CommitLog; + import org.apache.cassandra.gms.EndpointState; + import org.apache.cassandra.gms.Gossiper; + import org.apache.cassandra.gms.NewGossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.schema.DistributedSchema; + import org.apache.cassandra.tcm.compatibility.GossipHelper; import org.apache.cassandra.tcm.log.SystemKeyspaceStorage; + import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.ownership.UniformRangePlacement; import org.apache.cassandra.tcm.transformations.cms.Initialize; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL; + import static org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables; + import static org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates; public class Startup { @@ -63,6 +78,15 @@ package org.apache.cassandra.tcm; initializeAsNonCmsNode(wrapProcessor); initMessaging.run(); break; + case VOTE: + logger.info("Initializing for discovery"); + initializeAsNonCmsNode(wrapProcessor); + initializeForDiscovery(initMessaging); + break; + case UPGRADE: + logger.info("Initializing from gossip"); + initializeFromGossip(wrapProcessor, initMessaging); + break; } } @@ -87,13 +111,91 @@ package org.apache.cassandra.tcm; public static void initializeAsNonCmsNode(Function<ClusterMetadataService.Processor, ClusterMetadataService.Processor> wrapProcessor) { ClusterMetadata initial = new ClusterMetadata(DatabaseDescriptor.getPartitioner()); + initial.schema.initializeKeyspaceInstances(DistributedSchema.empty()); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), initial, wrapProcessor, ClusterMetadataService::state)); + + ClusterMetadataService.instance().initRecentlySealedPeriodsIndex(); ClusterMetadataService.instance().log().replayPersisted(); } + public static void initializeForDiscovery(Runnable initMessaging) + { + initMessaging.run(); + + logger.debug("Discovering other nodes in the system"); + Discovery.DiscoveredNodes candidates = Discovery.instance.discover(); + + if (candidates.kind() == Discovery.DiscoveredNodes.Kind.KNOWN_PEERS) + { + logger.debug("Got candidates: " + candidates); + InetAddressAndPort min = candidates.nodes().stream().min(InetAddressAndPort::compareTo).get(); + + // identify if you need to start the vote + if (min.equals(FBUtilities.getBroadcastAddressAndPort()) || FBUtilities.getBroadcastAddressAndPort().compareTo(min) < 0) + { + Election.instance.nominateSelf(candidates.nodes(), + Collections.singleton(FBUtilities.getBroadcastAddressAndPort()), + (cm) -> true); + } + } + + while (!ClusterMetadata.current().epoch.isAfter(Epoch.FIRST)) + { + if (candidates.kind() == Discovery.DiscoveredNodes.Kind.CMS_ONLY) + { + ClusterMetadataService.instance().processor().replayAndWait(); + } + else + { + Election.Initiator initiator = Election.instance.initiator(); + candidates = Discovery.instance.discoverOnce(initiator == null ? null : initiator.initiator); + } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + assert ClusterMetadata.current().epoch.isAfter(Epoch.FIRST); + Election.instance.migrated(); + } + + /** + * This should only be called during startup. + */ + public static void initializeFromGossip(Function<ClusterMetadataService.Processor, ClusterMetadataService.Processor> wrapProcessor, Runnable initMessaging) + { + ClusterMetadata emptyFromSystemTables = emptyWithSchemaFromSystemTables(); + emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty()); + ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), + emptyFromSystemTables, + wrapProcessor, + ClusterMetadataService::state)); + initMessaging.run(); + + try + { + CommitLog.instance.recoverSegmentsOnDisk(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + logger.debug("Starting to initialize ClusterMetadata from gossip"); + Map<InetAddressAndPort, EndpointState> epStates = NewGossiper.instance.doShadowRound(); + logger.debug("Got epStates {}", epStates); + ClusterMetadata initial = fromEndpointStates(emptyFromSystemTables.schema, epStates); + logger.debug("Created initial ClusterMetadata {}", initial); + ClusterMetadataService.instance().setFromGossip(initial); + Gossiper.instance.clearUnsafe(); + Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); + GossipHelper.mergeAllNodeStatesToGossip(initial); + // double check that everything was added, can remove once we are confident + ClusterMetadata cmGossip = fromEndpointStates(emptyFromSystemTables.schema, Gossiper.instance.getEndpointStates()); + assert cmGossip.equals(initial) : cmGossip + " != " + initial; + } + /** * Initialization process: */ @@ -112,10 +214,16 @@ package org.apache.cassandra.tcm; boolean hasFirstEpoch = SystemKeyspaceStorage.hasFirstEpoch(); boolean isOnlySeed = DatabaseDescriptor.getSeeds().size() == 1 && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort()); - if (isOnlySeed && !hasFirstEpoch) - return FIRST_CMS; - - return NORMAL; + boolean hasBootedBefore = SystemKeyspace.getLocalHostId() != null; + logger.info("hasFirstEpoch = {}, hasBootedBefore = {}", hasFirstEpoch, hasBootedBefore); + if (!hasFirstEpoch && hasBootedBefore) + return UPGRADE; + else if (hasFirstEpoch) + return NORMAL; + else if (isOnlySeed) + return FIRST_CMS; + else + return VOTE; } } } diff --git a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java new file mode 100644 index 0000000000..ccad52f14a --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.listeners; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.StreamSupport; + +import com.google.common.collect.Sets; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.virtual.PeersTable; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.compatibility.GossipHelper; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; + +import static org.apache.cassandra.tcm.membership.NodeState.LEFT; + +public class LegacyStateListener implements ChangeListener +{ + @Override + public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next) + { + if (!next.directory.lastModified().equals(prev.directory.lastModified()) || + !next.tokenMap.lastModified().equals(prev.tokenMap.lastModified())) + { + Set<NodeId> removed = Sets.difference(prev.directory.peerIds(), next.directory.peerIds()); + Set<NodeId> changed = new HashSet<>(); + for (NodeId node : next.directory.peerIds()) + { + NodeState oldState = prev.directory.peerState(node); + NodeState newState = next.directory.peerState(node); + if (oldState == null || oldState != newState || !prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node))) + changed.add(node); + } + + for (NodeId remove : removed) + { + GossipHelper.removeFromGossip(prev.directory.endpoint(remove)); + PeersTable.updateLegacyPeerTable(remove, prev, next); + } + + for (NodeId change : changed) + { + // next.myNodeId() can be null during replay (before we have registered) + if (next.myNodeId() != null && next.myNodeId().equals(change)) + { + switch (next.directory.peerState(change)) + { + case REGISTERED: + Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); + break; + case JOINED: + // needed if we miss the REGISTERED above; Does nothing if we are already in epStateMap: + Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED); + StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false) + .filter(cfs -> Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName())) + .forEach(cfs -> cfs.indexManager.executePreJoinTasksBlocking(true)); + break; + } + } + if (next.directory.peerState(change) != LEFT) + GossipHelper.mergeNodeToGossip(change, next); + else + GossipHelper.mergeNodeToGossip(change, next, prev.tokenMap.tokens(change)); + PeersTable.updateLegacyPeerTable(change, prev, next); + } + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 1a05a29460..0478500e97 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -40,19 +40,21 @@ import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.DurationSpec; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.listeners.ChangeListener; import org.apache.cassandra.tcm.listeners.InitializationListener; +import org.apache.cassandra.tcm.listeners.LegacyStateListener; import org.apache.cassandra.tcm.listeners.LogListener; import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener; +import org.apache.cassandra.tcm.listeners.PaxosRepairListener; import org.apache.cassandra.tcm.listeners.PlacementsChangeListener; import org.apache.cassandra.tcm.listeners.SchemaListener; -import org.apache.cassandra.tcm.listeners.PaxosRepairListener; -import org.apache.cassandra.tcm.transformations.ForceSnapshot; import org.apache.cassandra.tcm.transformations.cms.PreInitialize; +import org.apache.cassandra.tcm.transformations.ForceSnapshot; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.Closeable; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -96,7 +98,7 @@ public abstract class LocalLog implements Closeable Transformation transform = PreInitialize.withFirstCMS(addr); append(new Entry(Entry.Id.NONE, FIRST, transform)); waitForHighestConsecutive(); - assert metadata().epoch.is(Epoch.FIRST) : ClusterMetadata.current().epoch + " " + ClusterMetadata.current().fullCMSMembers(); + assert metadata().epoch.is(Epoch.FIRST) : ClusterMetadata.current().epoch + " " + ClusterMetadata.current().placements.get(ReplicationParams.meta()); } public ClusterMetadata metadata() @@ -104,7 +106,22 @@ public abstract class LocalLog implements Closeable return committed.get(); } - @VisibleForTesting + public boolean unsafeSetCommittedFromGossip(ClusterMetadata expected, ClusterMetadata updated) + { + if (!(expected.epoch.isEqualOrBefore(Epoch.UPGRADE_GOSSIP) && updated.epoch.is(Epoch.UPGRADE_GOSSIP))) + throw new IllegalStateException(String.format("Illegal epochs for setting from gossip; expected: %s, updated: %s", + expected.epoch, updated.epoch)); + return committed.compareAndSet(expected, updated); + } + + public void unsafeSetCommittedFromGossip(ClusterMetadata updated) + { + if (!updated.epoch.is(Epoch.UPGRADE_GOSSIP)) + throw new IllegalStateException(String.format("Illegal epoch for setting from gossip; updated: %s", + updated.epoch)); + committed.set(updated); + } + public int pendingBufferSize() { return pending.size(); @@ -546,6 +563,7 @@ public abstract class LocalLog implements Closeable addListener(snapshotListener()); addListener(new InitializationListener()); addListener(new SchemaListener()); + addListener(new LegacyStateListener()); addListener(new PlacementsChangeListener()); addListener(new MetadataSnapshotListener()); addListener(new PaxosRepairListener()); diff --git a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java new file mode 100644 index 0000000000..8599afec23 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.migration; + +import java.io.IOException; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; +import org.apache.cassandra.tcm.ClusterMetadata; + +public class ClusterMetadataHolder +{ + public static final ClusterMetadataHolder.Serializer serializer = new ClusterMetadataHolder.Serializer(); + + public final Election.Initiator coordinator; + public final ClusterMetadata metadata; + + public ClusterMetadataHolder(Election.Initiator coordinator, ClusterMetadata metadata) + { + this.coordinator = coordinator; + this.metadata = metadata; + } + + private static class Serializer implements IVersionedSerializer<ClusterMetadataHolder> + { + @Override + public void serialize(ClusterMetadataHolder t, DataOutputPlus out, int version) throws IOException + { + Election.Initiator.serializer.serialize(t.coordinator, out, version); + VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, t.metadata, out); + } + + @Override + public ClusterMetadataHolder deserialize(DataInputPlus in, int version) throws IOException + { + Election.Initiator coordinator = Election.Initiator.serializer.deserialize(in, version); + ClusterMetadata metadata = VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in); + return new ClusterMetadataHolder(coordinator, metadata); + } + + @Override + public long serializedSize(ClusterMetadataHolder t, int version) + { + return Election.Initiator.serializer.serializedSize(t.coordinator, version) + + VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer, t.metadata); + } + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 0e8efe8a16..c2ffcdf80b 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -18,28 +18,153 @@ package org.apache.cassandra.tcm.migration; +import java.io.IOException; import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.Message; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Startup; +import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.net.MessageDelivery; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.RequestCallbackWithFailure; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.UUIDSerializer; import org.apache.cassandra.utils.concurrent.Accumulator; import org.apache.cassandra.utils.concurrent.CountDownLatch; public class Election { private static final Logger logger = LoggerFactory.getLogger(Election.class); + private static final Initiator MIGRATED = new Initiator(null, null); + + private final AtomicReference<Initiator> initiator = new AtomicReference<>(); + + public static Election instance = new Election(); + + public final PrepareHandler prepareHandler; + public final AbortHandler abortHandler; + + private final MessageDelivery messaging; + + private Election() + { + this(MessagingService.instance()); + } + + private Election(MessageDelivery messaging) + { + this.messaging = messaging; + this.prepareHandler = new PrepareHandler(); + this.abortHandler = new AbortHandler(); + } + + public void nominateSelf(Set<InetAddressAndPort> candidates, Set<InetAddressAndPort> ignoredEndpoints, Function<ClusterMetadata, Boolean> isMatch) + { + Set<InetAddressAndPort> sendTo = new HashSet<>(candidates); + sendTo.removeAll(ignoredEndpoints); + sendTo.remove(FBUtilities.getBroadcastAddressAndPort()); + + try + { + initiate(sendTo, isMatch); + finish(sendTo); + } + catch (Exception e) + { + abort(sendTo); + throw e; + } + } + + private void initiate(Set<InetAddressAndPort> sendTo, Function<ClusterMetadata, Boolean> isMatch) + { + if (!updateInitiator(null, new Initiator(FBUtilities.getBroadcastAddressAndPort(), UUID.randomUUID()))) + throw new IllegalStateException("Migration already initiated by " + initiator.get()); + + Collection<Pair<InetAddressAndPort, ClusterMetadataHolder>> metadatas = fanoutAndWait(messaging, sendTo, Verb.TCM_INIT_MIG_REQ, initiator.get()); + if (metadatas.size() != sendTo.size()) + { + Set<InetAddressAndPort> responded = metadatas.stream().map(p -> p.left).collect(Collectors.toSet()); + String msg = String.format("Did not get response from %s - not continuing with migration. Ignore down hosts with --ignore <host>", Sets.difference(sendTo, responded)); + logger.warn(msg); + throw new IllegalStateException(msg); + } + + Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p -> !isMatch.apply(p.right.metadata)).map(p -> p.left).collect(Collectors.toSet()); + if (!mismatching.isEmpty()) + { + // todo; log the differences between the metadatas + String msg = String.format("Got mismatching cluster metadatas from %s aborting migration", mismatching); + throw new IllegalStateException(msg); + } + } + + private void finish(Set<InetAddressAndPort> sendTo) + { + Initiator currentCoordinator = initiator.get(); + assert currentCoordinator.initiator.equals(FBUtilities.getBroadcastAddressAndPort()); + + Startup.initializeAsFirstCMSNode(); + Register.maybeRegister(); + + updateInitiator(currentCoordinator, MIGRATED); + fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY)); + } + + private void abort(Set<InetAddressAndPort> sendTo) + { + Initiator init = initiator.getAndSet(null); + for (InetAddressAndPort ep : sendTo) + messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep); + } + + public Initiator initiator() + { + return initiator.get(); + } + + public void migrated() + { + initiator.set(MIGRATED); + } + + private boolean updateInitiator(Initiator expected, Initiator newCoordinator) + { + Initiator current = initiator.get(); + return Objects.equals(current, expected) && initiator.compareAndSet(current, newCoordinator); + } + + public boolean isMigrating() + { + Initiator coordinator = initiator(); + return coordinator != null && coordinator != MIGRATED; + } public static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb verb, REQ payload) { @@ -67,4 +192,89 @@ public class Election cdl.awaitUninterruptibly(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); return responses.snapshot(); } + + public class PrepareHandler implements IVerbHandler<Initiator> + { + @Override + public void doVerb(Message<Initiator> message) throws IOException + { + if (!updateInitiator(null, message.payload)) + throw new IllegalStateException(String.format("Got duplicate initiate migration message from %s, migration is already started by %s", message.from(), initiator())); + + // todo; disallow ANY changes to state managed in ClusterMetadata + messaging.send(message.responseWith(new ClusterMetadataHolder(message.payload, ClusterMetadata.current())), message.from()); + } + } + + public class AbortHandler implements IVerbHandler<Initiator> + { + @Override + public void doVerb(Message<Initiator> message) throws IOException + { + if (!message.from().equals(initiator().initiator) || !updateInitiator(message.payload, null)) + logger.error("Could not clear initiator - initiator is set to {}, abort message received from {}", initiator(), message.payload); + } + } + + public static class Initiator + { + public static final Serializer serializer = new Serializer(); + + public final InetAddressAndPort initiator; + public final UUID initToken; + + public Initiator(InetAddressAndPort initiator, UUID initToken) + { + this.initiator = initiator; + this.initToken = initToken; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Initiator)) return false; + Initiator other = (Initiator) o; + return Objects.equals(initiator, other.initiator) && Objects.equals(initToken, other.initToken); + } + + @Override + public int hashCode() + { + return Objects.hash(initiator, initToken); + } + + @Override + public String toString() + { + return "Initiator{" + + "initiator=" + initiator + + ", initToken=" + initToken + + '}'; + } + + public static class Serializer implements IVersionedSerializer<Initiator> + { + @Override + public void serialize(Initiator t, DataOutputPlus out, int version) throws IOException + { + InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator, out, version); + UUIDSerializer.serializer.serialize(t.initToken, out, version); + } + + @Override + public Initiator deserialize(DataInputPlus in, int version) throws IOException + { + return new Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, version), + UUIDSerializer.serializer.deserialize(in, version)); + } + + @Override + public long serializedSize(Initiator t, int version) + { + return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator, version) + + UUIDSerializer.serializer.serializedSize(t.initToken, version); + } + } + } } diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java index bf58689860..bbc03f2e21 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java @@ -24,8 +24,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.utils.CassandraVersion; @@ -64,8 +65,8 @@ public class GossipCMSListener implements IEndpointStateChangeSubscriber ClusterMetadata newCM = metadata.transformer() .withNodeInformation(nodeId, newNodeVersion, metadata.directory.getNodeAddresses(nodeId)) .buildForGossipMode(); -// if (ClusterMetadataService.instance().applyFromGossip(metadata, newCM)) -// return; + if (ClusterMetadataService.instance().applyFromGossip(metadata, newCM)) + return; metadata = ClusterMetadata.current(); } } diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java new file mode 100644 index 0000000000..71bb0424d8 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.migration; + +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Commit; +import org.apache.cassandra.tcm.log.Entry; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.ClusterMetadata; + +public class GossipProcessor implements ClusterMetadataService.Processor +{ + @Override + public Commit.Result commit(Entry.Id entryId, Transformation transform, Epoch lastKnown) + { + throw new IllegalStateException("Can't commit transformations when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); + } + + @Override + public ClusterMetadata replayAndWait() + { + return ClusterMetadata.current(); + } +} diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 35a673c406..0eb592dc60 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -94,6 +94,7 @@ public class NodeTool public int execute(String... args) { List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList( + AddToCMS.class, Assassinate.class, CassHelp.class, CfHistograms.class, diff --git a/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java b/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java new file mode 100644 index 0000000000..bf9e5d1cab --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.List; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +@Command(name = "addtocms", description = "Add this node to the cluster metadata service") +public class AddToCMS extends NodeTool.NodeToolCmd +{ + @Option(title = "ignored endpoints", name = {"-i", "--ignore"}, description = "Hosts to ignore due to them being down", required = false) + private List<String> endpoint = new ArrayList<>(); + @Override + protected void execute(NodeProbe probe) + { + probe.getStorageService().addToCms(endpoint); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
