This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit bde3d39daf1d626de5bbef6409fe2ffe1ff1ec69
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Mar 3 17:40:14 2023 +0000

    [CEP-21] Upgrade support
    
    Following an upgrade, nodes in an existing cluster will enter a minimal
    modification mode. In this state, the set of allowed cluster metadata
    modifications is constrained to include only the addition, removal and
    replacement of nodes, to allow failed hosts to be replaced during the
    upgrade.
    
    In this mode the CMS has no members and each peer maintains its
    own ClusterMetadata independently. This metadata is intitialised at
    startup from system tables and gossip is used to propagate the permitted
    metadata changes.
    
    When the operator is ready, one node is chosen for promotion to the initial
    CMS, which is done manually via nodetool. At this point, the candidate node
    will propose itself as the initial CMS and attempt to gain consensus from
    the rest of the cluster. If successful, it verifies that all peers have an
    identical view of cluster metadata and initialises the distributed log with
    a snapshot of that metadata.
    
    Once this process is complete all future cluster metadata updates are 
performed
    via the CMS using the global log and reverting to the previous method of
    metadata management is not supported. Further members can and should be 
added
    to the CMS via the nodetool command.
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../db/commitlog/CommitLogDescriptor.java          |   5 +-
 .../apache/cassandra/db/virtual/PeersTable.java    | 187 ++++++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../apache/cassandra/hints/HintsDescriptor.java    |   5 +-
 .../cassandra/locator/InetAddressAndPort.java      |  14 +-
 src/java/org/apache/cassandra/net/InboundSink.java |  28 ++-
 .../apache/cassandra/net/ResponseVerbHandler.java  |   2 +
 src/java/org/apache/cassandra/net/Verb.java        |  11 +-
 .../apache/cassandra/service/StorageService.java   |   2 +-
 .../cassandra/tcm/ClusterMetadataService.java      | 121 +++++++++++-
 src/java/org/apache/cassandra/tcm/Commit.java      |  14 +-
 src/java/org/apache/cassandra/tcm/Discovery.java   |   1 +
 src/java/org/apache/cassandra/tcm/Startup.java     | 116 ++++++++++-
 .../tcm/listeners/LegacyStateListener.java         |  91 +++++++++
 .../org/apache/cassandra/tcm/log/LocalLog.java     |  28 ++-
 .../tcm/migration/ClusterMetadataHolder.java       |  66 +++++++
 .../apache/cassandra/tcm/migration/Election.java   | 212 ++++++++++++++++++++-
 .../cassandra/tcm/migration/GossipCMSListener.java |   7 +-
 .../cassandra/tcm/migration/GossipProcessor.java   |  41 ++++
 src/java/org/apache/cassandra/tools/NodeTool.java  |   1 +
 .../apache/cassandra/tools/nodetool/AddToCMS.java  |  39 ++++
 21 files changed, 954 insertions(+), 38 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index ed2af1bd0d..41444fecde 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -62,13 +62,14 @@ public class CommitLogDescriptor
     // We don't support anything pre-3.0
     public static final int VERSION_30 = 6;
     public static final int VERSION_40 = 7;
+    public static final int VERSION_50 = 8;
 
     /**
      * Increment this number if there is a changes in the commit log disc 
layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
     @VisibleForTesting
-    public static final int current_version = VERSION_40;
+    public static final int current_version = VERSION_50;
 
     final int version;
     public final long id;
@@ -222,6 +223,8 @@ public class CommitLogDescriptor
                 return MessagingService.VERSION_30;
             case VERSION_40:
                 return MessagingService.VERSION_40;
+            case VERSION_50:
+                return MessagingService.VERSION_50;
             default:
                 throw new IllegalStateException("Unknown commitlog version " + 
version);
         }
diff --git a/src/java/org/apache/cassandra/db/virtual/PeersTable.java 
b/src/java/org/apache/cassandra/db/virtual/PeersTable.java
new file mode 100644
index 0000000000..157bed2f60
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/PeersTable.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.db.SystemKeyspace.LEGACY_PEERS;
+import static org.apache.cassandra.db.SystemKeyspace.PEERS_V2;
+import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
+
+public class PeersTable extends AbstractVirtualTable
+{
+    public static String PEER = "peer";
+    public static String PEER_PORT = "peer_port";
+    public static String DATA_CENTER = "data_center";
+    public static String HOST_ID = "host_id";
+    public static String PREFERRED_IP = "preferred_ip";
+    public static String PREFERRED_PORT = "preferred_port";
+    public static String RACK = "rack";
+    public static String RELEASE_VERSION = "release_version";
+    public static String NATIVE_ADDRESS = "native_address";
+    public static String NATIVE_PORT = "native_port";
+    public static String SCHEMA_VERSION = "schema_version";
+    public static String TOKENS = "tokens";
+    public static String STATE = "state";
+
+    public PeersTable(String keyspace)
+    {
+        super(TableMetadata.builder(keyspace, "peers")
+                           .comment("Peers")
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new 
LocalPartitioner(InetAddressType.instance))
+                           .addPartitionKeyColumn(PEER, 
InetAddressType.instance)
+                           .addClusteringColumn(PEER_PORT, Int32Type.instance)
+                           .addRegularColumn(DATA_CENTER, UTF8Type.instance)
+                           .addRegularColumn(RACK, UTF8Type.instance)
+                           .addRegularColumn(HOST_ID, UUIDType.instance)
+                           .addRegularColumn(PREFERRED_IP, 
InetAddressType.instance)
+                           .addRegularColumn(PREFERRED_PORT, 
Int32Type.instance)
+                           .addRegularColumn(NATIVE_ADDRESS, 
InetAddressType.instance)
+                           .addRegularColumn(NATIVE_PORT, Int32Type.instance)
+                           .addRegularColumn(RELEASE_VERSION, 
UTF8Type.instance)
+                           .addRegularColumn(SCHEMA_VERSION, UUIDType.instance)
+                           .addRegularColumn(STATE, UTF8Type.instance)
+                           .addRegularColumn(TOKENS, 
SetType.getInstance(UTF8Type.instance, false))
+                           .build());
+    }
+
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+        for (InetAddressAndPort addr : metadata.directory.allAddresses())
+        {
+            NodeId peer = metadata.directory.peerId(addr);
+
+            NodeAddresses addresses = 
metadata.directory.getNodeAddresses(peer);
+            result.row(addr.getAddress(), addr.getPort())
+                  .column(DATA_CENTER, 
metadata.directory.location(peer).datacenter)
+                  .column(RACK, metadata.directory.location(peer).rack)
+                  .column(HOST_ID, peer.uuid)
+                  .column(PREFERRED_IP, 
addresses.broadcastAddress.getAddress())
+                  .column(PREFERRED_PORT, addresses.broadcastAddress.getPort())
+                  .column(NATIVE_ADDRESS, addresses.nativeAddress.getAddress())
+                  .column(NATIVE_PORT, addresses.nativeAddress.getPort())
+                  .column(RELEASE_VERSION, 
metadata.directory.version(peer).cassandraVersion.toString())
+                  .column(SCHEMA_VERSION, Schema.instance.getVersion()) //TODO
+                  .column(STATE, metadata.directory.peerState(peer).toString())
+                  .column(TOKENS, new 
HashSet<>(metadata.tokenMap.tokens(peer).stream().map((token) -> 
token.getToken().getTokenValue().toString()).collect(Collectors.toList())));
+        }
+
+        return result;
+    }
+
+    public static void initializeLegacyPeerTables(ClusterMetadata prev, 
ClusterMetadata next)
+    {
+        QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", 
SYSTEM_KEYSPACE_NAME, PEERS_V2));
+        QueryProcessor.executeInternal(String.format("TRUNCATE %s.%s", 
SYSTEM_KEYSPACE_NAME, LEGACY_PEERS));
+
+        for (NodeId nodeId : next.directory.peerIds())
+            updateLegacyPeerTable(nodeId, prev, next);
+    }
+
+    private static String peers_v2_query = "INSERT INTO %s.%s ("
+                                            + "peer, peer_port, "
+                                            + "preferred_ip, preferred_port, "
+                                            + "native_address, native_port, "
+                                            + "data_center, rack, "
+                                            + "host_id, "
+                                            + "release_version, "
+                                            + "schema_version,"
+                                            + "tokens) " +
+                                            "VALUES " +
+                                            "(?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    private static String legacy_peers_query = "INSERT INTO %s.%s ("
+                                               + "peer, preferred_ip, 
rpc_address, "
+                                               + "data_center, rack, "
+                                               + "host_id, "
+                                               + "release_version, "
+                                               + "schema_version,"
+                                               + "tokens) " +
+                                               "VALUES " +
+                                               "(?,?,?,?,?,?,?,?,?)";
+
+    private static String peers_delete_query = "DELETE FROM %s.%s WHERE peer=? 
and peer_port=?";
+    private static String legacy_peers_delete_query = "DELETE FROM %s.%s WHERE 
peer=?";
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PeersTable.class);
+    public static void updateLegacyPeerTable(NodeId nodeId, ClusterMetadata 
prev, ClusterMetadata next)
+    {
+        if 
(nodeId.equals(next.directory.peerId(FBUtilities.getBroadcastAddressAndPort())))
+            return;
+
+        if (next.directory.peerState(nodeId) == null || 
next.directory.peerState(nodeId) == NodeState.LEFT)
+        {
+            NodeAddresses addresses = prev.directory.getNodeAddresses(nodeId);
+            logger.debug("Purging {} from system.peers_v2 table", addresses);
+            QueryProcessor.executeInternal(String.format(peers_delete_query, 
SYSTEM_KEYSPACE_NAME, PEERS_V2), addresses.broadcastAddress.getAddress(), 
addresses.broadcastAddress.getPort());
+            
QueryProcessor.executeInternal(String.format(legacy_peers_delete_query, 
SYSTEM_KEYSPACE_NAME, LEGACY_PEERS), addresses.broadcastAddress.getAddress());
+        }
+        else
+        {
+            NodeAddresses addresses = next.directory.getNodeAddresses(nodeId);
+            Location location = next.directory.location(nodeId);
+
+            Set<String> tokens = 
SystemKeyspace.tokensAsSet(next.tokenMap.tokens(nodeId));
+            QueryProcessor.executeInternal(String.format(peers_v2_query, 
SYSTEM_KEYSPACE_NAME, PEERS_V2),
+                                           
addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort(),
+                                           
addresses.broadcastAddress.getAddress(), addresses.broadcastAddress.getPort(),
+                                           
addresses.nativeAddress.getAddress(), addresses.nativeAddress.getPort(),
+                                           location.datacenter, location.rack,
+                                           nodeId.uuid,
+                                           
next.directory.version(nodeId).cassandraVersion.toString(),
+                                           next.schema.getVersion(),
+                                           tokens);
+
+            QueryProcessor.executeInternal(String.format(legacy_peers_query, 
SYSTEM_KEYSPACE_NAME, LEGACY_PEERS),
+                                           
addresses.broadcastAddress.getAddress(), 
addresses.broadcastAddress.getAddress(), addresses.nativeAddress.getAddress(),
+                                           location.datacenter, location.rack,
+                                           nodeId.uuid,
+                                           
next.directory.version(nodeId).cassandraVersion.toString(),
+                                           next.schema.getVersion(),
+                                           tokens);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 7ebed45ac2..a49dbea7cb 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -50,6 +50,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new QueriesTable(VIRTUAL_VIEWS))
                     .add(new LogMessagesTable(VIRTUAL_VIEWS))
                     .add(new SnapshotsTable(VIRTUAL_VIEWS))
+                    .add(new PeersTable(VIRTUAL_VIEWS))
                     .add(new LocalTable(VIRTUAL_VIEWS))
                     .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
                     .build());
diff --git a/src/java/org/apache/cassandra/hints/HintsDescriptor.java 
b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
index 8e1f782f1d..b915cf6bda 100644
--- a/src/java/org/apache/cassandra/hints/HintsDescriptor.java
+++ b/src/java/org/apache/cassandra/hints/HintsDescriptor.java
@@ -66,7 +66,8 @@ final class HintsDescriptor
 
     static final int VERSION_30 = 1;
     static final int VERSION_40 = 2;
-    static final int CURRENT_VERSION = VERSION_40;
+    static final int VERSION_50 = 3;
+    static final int CURRENT_VERSION = VERSION_50;
 
     static final String COMPRESSION = "compression";
     static final String ENCRYPTION = "encryption";
@@ -232,6 +233,8 @@ final class HintsDescriptor
                 return MessagingService.VERSION_30;
             case VERSION_40:
                 return MessagingService.VERSION_40;
+            case VERSION_50:
+                return MessagingService.VERSION_50;
             default:
                 throw new AssertionError();
         }
diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java 
b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
index 65f646842a..298d7e76cf 100644
--- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
+++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java
@@ -37,8 +37,8 @@ import com.google.common.net.HostAndPort;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FastByteOperations;
@@ -221,6 +221,17 @@ public final class InetAddressAndPort extends 
InetSocketAddress implements Compa
         return getByNameOverrideDefaults(name, null);
     }
 
+    public static InetAddressAndPort getByNameUnchecked(String name)
+    {
+        try
+        {
+            return getByNameOverrideDefaults(name, null);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 
     public static List<InetAddressAndPort> getAllByName(String name) throws 
UnknownHostException
     {
@@ -338,7 +349,6 @@ public final class InetAddressAndPort extends 
InetSocketAddress implements Compa
             return Serializer.inetAddressAndPortSerializer.serializedSize(t, 
SERDE_VERSION);
         }
     }
-
     /**
      * As of version 4.0 the endpoint description includes a port number as an 
unsigned short
      * This serializer matches the 3.0 CompactEndpointSerializationHelper, 
encoding the number of address bytes
diff --git a/src/java/org/apache/cassandra/net/InboundSink.java 
b/src/java/org/apache/cassandra/net/InboundSink.java
index 16eb440540..b3c49c89da 100644
--- a/src/java/org/apache/cassandra/net/InboundSink.java
+++ b/src/java/org/apache/cassandra/net/InboundSink.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
@@ -29,6 +30,9 @@ import 
org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.index.IndexNotAvailableException;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.NotCMSException;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 /**
@@ -72,10 +76,28 @@ public class InboundSink implements 
InboundMessageHandlers.MessageConsumer
 
     private final MessagingService messaging;
 
+    private final static EnumSet<Verb> allowedDuringStartup = 
EnumSet.of(Verb.GOSSIP_DIGEST_ACK, Verb.GOSSIP_DIGEST_SYN);
+
     InboundSink(MessagingService messaging)
     {
         this.messaging = messaging;
-        this.sink = message -> 
message.header.verb.handler().doVerb((Message<Object>) message);
+        this.sink = message -> {
+            IVerbHandler handler = message.header.verb.handler();
+            if (handler == null)
+            {
+                String err = String.format("Handler for verb %s is null", 
message.header.verb);
+                noSpamLogger.info(err);
+                throw new IllegalStateException(err);
+            }
+
+            if (ClusterMetadata.current().epoch.is(Epoch.UPGRADE_STARTUP) && 
!allowedDuringStartup.contains(message.header.verb))
+            {
+                noSpamLogger.info("Ignoring message from {} with 
verb="+message.header.verb, message.from());
+                return;
+            }
+
+            handler.doVerb(message);
+        };
     }
 
     public void fail(Message.Header header, Throwable failure)
@@ -100,7 +122,9 @@ public class InboundSink implements 
InboundMessageHandlers.MessageConsumer
         {
             fail(message.header, t);
 
-            if (t instanceof TombstoneOverwhelmingException || t instanceof 
IndexNotAvailableException)
+            if (t instanceof NotCMSException)
+                noSpamLogger.warn(t.getMessage());
+            else if (t instanceof TombstoneOverwhelmingException || t 
instanceof IndexNotAvailableException)
                 noSpamLogger.error(t.getMessage());
             else if (t instanceof RuntimeException)
                 throw (RuntimeException) t;
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 27235888ad..3b4a1c69d4 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -42,6 +42,8 @@ class ResponseVerbHandler implements IVerbHandler
             message.verb() != Verb.TCM_REPLICATION &&
             message.verb() != Verb.TCM_NOTIFY_RSP &&
             message.verb() != Verb.TCM_DISCOVER_RSP &&
+            message.verb() != Verb.TCM_INIT_MIG_RSP &&
+            message.verb() != Verb.TCM_INIT_MIG_RSP &&
             // Gossip stage is single-threaded, so we may end up in a deadlock 
with after-commit hook
             // that executes something on the gossip stage as well.
             !Stage.GOSSIP.executor().inExecutor())
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index ab3504bdd9..0cec5f1aa3 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -87,7 +87,7 @@ import 
org.apache.cassandra.service.paxos.cleanup.PaxosCleanupResponse;
 import org.apache.cassandra.service.paxos.cleanup.PaxosCleanupComplete;
 import org.apache.cassandra.service.paxos.cleanup.PaxosStartPrepareCleanup;
 import org.apache.cassandra.service.paxos.cleanup.PaxosFinishPrepareCleanup;
-import org.apache.cassandra.tcm.Commit.Result;import 
org.apache.cassandra.tcm.Discovery;import 
org.apache.cassandra.tcm.Replay;import 
org.apache.cassandra.tcm.log.LogState;import 
org.apache.cassandra.tcm.log.Replication;import 
org.apache.cassandra.utils.BooleanSerializer;
+import org.apache.cassandra.tcm.Commit.Result;import 
org.apache.cassandra.tcm.Discovery;import 
org.apache.cassandra.tcm.Replay;import 
org.apache.cassandra.tcm.log.LogState;import 
org.apache.cassandra.tcm.log.Replication;import 
org.apache.cassandra.tcm.migration.ClusterMetadataHolder;import 
org.apache.cassandra.tcm.migration.Election;import 
org.apache.cassandra.utils.BooleanSerializer;
 import org.apache.cassandra.service.EchoVerbHandler;
 import org.apache.cassandra.service.SnapshotVerbHandler;
 import org.apache.cassandra.service.paxos.Commit;
@@ -211,9 +211,12 @@ public enum Verb
     TCM_REPLICATION        (805, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Replication.messageSerializer,                () -> replicationHandler()     
                                ),
     TCM_NOTIFY_RSP         (806, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
ResponseVerbHandler.instance                             ),
     TCM_NOTIFY_REQ         (807, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> LogState.serializer,                          () -> logNotifyHandler(),      
             TCM_NOTIFY_RSP     ),
-    TCM_CURRENT_EPOCH_REQ  (809, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
currentEpochRequestHandler(),         TCM_NOTIFY_RSP     ),
-    TCM_DISCOVER_RSP       (815, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         () -> 
ResponseVerbHandler.instance                             ),
-    TCM_DISCOVER_REQ       (816, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP   ),
+    TCM_CURRENT_EPOCH_REQ  (808, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
currentEpochRequestHandler(),         TCM_NOTIFY_RSP     ),
+    TCM_INIT_MIG_RSP       (809, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> ClusterMetadataHolder.serializer,             () -> 
ResponseVerbHandler.instance                             ),
+    TCM_INIT_MIG_REQ       (810, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.prepareHandler,     TCM_INIT_MIG_RSP   ),
+    TCM_ABORT_MIG          (811, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP   ),
+    TCM_DISCOVER_RSP       (812, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         () -> 
ResponseVerbHandler.instance                             ),
+    TCM_DISCOVER_REQ       (813, P1, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP   ),
 
     // generic failure response
     FAILURE_RSP            (99,  P0, noTimeout,       REQUEST_RESPONSE,  () -> 
RequestFailureReason.serializer,      () -> ResponseVerbHandler.instance        
                     ),
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index b6090bfd2d..b05511c494 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -216,10 +216,10 @@ import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Startup;
 import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.transformations.Register;
+import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.net.AsyncOneResponse;
 import org.apache.cassandra.net.MessagingService;
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 4cf0db438a..be827cc9d6 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.tcm;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,13 +41,20 @@ import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.Replication;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.migration.Election;
+import org.apache.cassandra.tcm.migration.GossipProcessor;
 import org.apache.cassandra.tcm.ownership.PlacementProvider;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.util.stream.Collectors.toSet;
 import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
 import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables;
+import static org.apache.cassandra.utils.Collectors3.toImmutableSet;
 
 
 public class ClusterMetadataService
@@ -111,8 +121,11 @@ public class ClusterMetadataService
         return state(ClusterMetadata.current());
     }
 
-    public static State state(ClusterMetadata clusterMetadata)
+    public static State state(ClusterMetadata metadata)
     {
+        if (metadata.epoch.isBefore(Epoch.EMPTY))
+            return State.GOSSIP;
+
         // The node is a full member of the CMS if it has started 
participating in reads for distributed metadata table (which
         // implies it is a write replica as well). In other words, it's a 
fully joined member of the replica set responsible for
         // the distributed metadata table.
@@ -131,12 +144,15 @@ public class ClusterMetadataService
 
         log = LocalLog.async(initial);
         Processor localProcessor = wrapProcessor.apply(new 
PaxosBackedProcessor(log));
+        RemoteProcessor remoteProcessor = new RemoteProcessor(log, 
Discovery.instance::discoveredNodes);
+        GossipProcessor gossipProcessor = new GossipProcessor();
         replicator = new Commit.DefaultReplicator(() -> 
log.metadata().directory);
         currentEpochHandler = new CurrentEpochRequestHandler();
         replayRequestHandler = new SwitchableHandler<>(new Replay.Handler(), 
cmsStateSupplier);
         commitRequestHandler = new SwitchableHandler<>(new 
Commit.Handler(localProcessor, replicator), cmsStateSupplier);
         processor = new SwitchableProcessor(localProcessor,
-                                            new RemoteProcessor(log, 
Discovery.instance::discoveredNodes),
+                                            remoteProcessor,
+                                            gossipProcessor,
                                             cmsStateSupplier);
 
         replicationHandler = new Replication.ReplicationHandler(log);
@@ -218,14 +234,92 @@ public class ClusterMetadataService
 
     public void addToCms(List<String> ignoredEndpoints)
     {
+        Set<InetAddressAndPort> ignored = 
ignoredEndpoints.stream().map(InetAddressAndPort::getByNameUnchecked).collect(toSet());
+        if (ignored.contains(FBUtilities.getBroadcastAddressAndPort()))
+        {
+            String msg = "Can't ignore local host " + 
FBUtilities.getBroadcastAddressAndPort() + " when doing CMS migration";
+            logger.error(msg);
+            throw new IllegalStateException(msg);
+        }
+
         ClusterMetadata metadata = metadata();
-        if (metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()))
+        Set<InetAddressAndPort> existingMembers = metadata.fullCMSMembers();
+        if (existingMembers.contains(FBUtilities.getBroadcastAddressAndPort()))
         {
             logger.info("Already in the CMS");
             throw new IllegalStateException("Already in the CMS");
         }
 
-        // TODO
+        if (!metadata.directory.allAddresses().containsAll(ignored))
+        {
+            Set<InetAddressAndPort> allAddresses = 
Sets.newHashSet(metadata.directory.allAddresses());
+            String msg = String.format("Ignored host(s) %s don't exist in the 
cluster", Sets.difference(ignored, allAddresses));
+            logger.error(msg);
+            throw new IllegalStateException(msg);
+        }
+
+        for (Map.Entry<NodeId, NodeVersion> entry : 
metadata.directory.versions.entrySet())
+        {
+            NodeVersion version = entry.getValue();
+            InetAddressAndPort ep = 
metadata.directory.getNodeAddresses(entry.getKey()).broadcastAddress;
+            if (ignored.contains(ep))
+            {
+                // todo; what do we do if an endpoint has a mismatching 
gossip-clustermetadata?
+                //       - we could add the node to --ignore and force this CM 
to it?
+                //       - require operator to bounce/manually fix the CM on 
that node
+                //       for now just requiring that any ignored host is also 
down
+//                if (FailureDetector.instance.isAlive(ep))
+//                    throw new IllegalStateException("Can't ignore " + ep + " 
during CMS migration - it is not down");
+                logger.info("Endpoint {} running {} is ignored", ep, version);
+                continue;
+            }
+
+            if (!version.isUpgraded())
+            {
+                String msg = String.format("All nodes are not yet upgraded - 
%s is running %s", metadata.directory.endpoint(entry.getKey()), version);
+                logger.error(msg);
+                throw new IllegalStateException(msg);
+            }
+        }
+
+        if (existingMembers.isEmpty())
+        {
+            logger.info("First CMS node");
+            Set<InetAddressAndPort> candidates = metadata
+                                                 .directory
+                                                 .allAddresses()
+                                                 .stream()
+                                                 .filter(ep -> 
!FBUtilities.getBroadcastAddressAndPort().equals(ep) &&
+                                                               
!ignoredEndpoints.contains(ep))
+                                                 .collect(toImmutableSet());
+
+            Election.instance.nominateSelf(candidates, ignored, 
metadata::equals);
+            ClusterMetadataService.instance().sealPeriod();
+        }
+        else
+        {
+            logger.info("Adding local node to existing CMS nodes; {}", 
existingMembers);
+            AddToCMS.initiate();
+        }
+    }
+
+    public boolean applyFromGossip(ClusterMetadata expected, ClusterMetadata 
updated)
+    {
+        logger.debug("Applying from gossip, current={} new={}", expected, 
updated);
+        if (!expected.epoch.isBefore(Epoch.EMPTY))
+            throw new IllegalStateException("Can't apply a ClusterMetadata 
from gossip with epoch " + expected.epoch);
+        if (state() != State.GOSSIP)
+            throw new IllegalStateException("Can't apply a ClusterMetadata 
from gossip when CMSState is not GOSSIP: " + state());
+
+        return log.unsafeSetCommittedFromGossip(expected, updated);
+    }
+
+    public void setFromGossip(ClusterMetadata fromGossip)
+    {
+        logger.debug("Setting from gossip, new={}", fromGossip);
+        if (state() != State.GOSSIP)
+            throw new IllegalStateException("Can't apply a ClusterMetadata 
from gossip when CMSState is not GOSSIP: " + state());
+        log.unsafeSetCommittedFromGossip(fromGossip);
     }
 
     public final Supplier<Entry.Id> entryIdGen = new Entry.DefaultEntryIdGen();
@@ -353,6 +447,12 @@ public class ClusterMetadataService
         Epoch ourEpoch = ClusterMetadata.current().epoch;
         if (!theirEpoch.isBefore(Epoch.FIRST) && theirEpoch.isAfter(ourEpoch))
         {
+            if (state() == State.GOSSIP)
+            {
+                logger.warn("TODO: can't catchup in gossip mode (their epoch = 
{})", theirEpoch); //todo: we have seen a message with epoch > EMPTY, we are 
probably racing with migration, or we missed the finish migration message, 
handle!
+                return;
+            }
+
             replayAndWait();
             ourEpoch = ClusterMetadata.current().epoch;
             if (theirEpoch.isAfter(ourEpoch))
@@ -396,8 +496,7 @@ public class ClusterMetadataService
 
     public boolean isMigrating()
     {
-        return false;
-//        return Election.instance.isMigrating();
+        return Election.instance.isMigrating();
     }
 
     /**
@@ -426,6 +525,10 @@ public class ClusterMetadataService
                     break;
                 case REMOTE:
                     throw new NotCMSException("Not currently a member of the 
CMS");
+                case GOSSIP:
+                    String msg = "Tried to use a handler when in gossip mode: 
"+handler.toString();
+                    logger.error(msg);
+                    throw new IllegalStateException(msg);
                 default:
                     throw new IllegalStateException("Illegal state: " + 
cmsStateSupplier.get());
             }
@@ -437,12 +540,14 @@ public class ClusterMetadataService
     {
         private final Processor local;
         private final RemoteProcessor remote;
+        private final GossipProcessor gossip;
         private final Supplier<State> cmsStateSupplier;
 
-        SwitchableProcessor(Processor local, RemoteProcessor remote, 
Supplier<State> cmsStateSupplier)
+        SwitchableProcessor(Processor local, RemoteProcessor remote, 
GossipProcessor gossip, Supplier<State> cmsStateSupplier)
         {
             this.local = local;
             this.remote = remote;
+            this.gossip = gossip;
             this.cmsStateSupplier = cmsStateSupplier;
         }
 
@@ -456,6 +561,8 @@ public class ClusterMetadataService
                     return local;
                 case REMOTE:
                     return remote;
+                case GOSSIP:
+                    return gossip;
             }
             throw new IllegalStateException("Bad CMS state: " + state);
         }
diff --git a/src/java/org/apache/cassandra/tcm/Commit.java 
b/src/java/org/apache/cassandra/tcm/Commit.java
index 54e6d6ab0c..e4548085ee 100644
--- a/src/java/org/apache/cassandra/tcm/Commit.java
+++ b/src/java/org/apache/cassandra/tcm/Commit.java
@@ -31,15 +31,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.Replication;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.net.*;
 import org.apache.cassandra.tcm.membership.Directory;
 import org.apache.cassandra.tcm.membership.NodeId;
-import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
@@ -323,8 +320,11 @@ public class Commit
             for (NodeId peerId : directory.peerIds())
             {
                 InetAddressAndPort endpoint = directory.endpoint(peerId);
+                boolean upgraded = directory.version(peerId).isUpgraded();
                 // Do not replicate to self and to the peer that has requested 
to commit this message
-                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) 
|| (source != null && source.equals(endpoint)))
+                if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()) 
||
+                    (source != null && source.equals(endpoint)) ||
+                    !upgraded)
                 {
                     continue;
                 }
diff --git a/src/java/org/apache/cassandra/tcm/Discovery.java 
b/src/java/org/apache/cassandra/tcm/Discovery.java
index 58efbbccc6..9320e9bffb 100644
--- a/src/java/org/apache/cassandra/tcm/Discovery.java
+++ b/src/java/org/apache/cassandra/tcm/Discovery.java
@@ -123,6 +123,7 @@ public class Discovery
         public void doVerb(Message<NoPayload> message)
         {
             Set<InetAddressAndPort> cms = 
ClusterMetadata.current().fullCMSMembers();
+            logger.debug("Responding to discovery request from {}: {}", 
message.from(), cms);
 
             DiscoveredNodes discoveredNodes;
             if (!cms.isEmpty())
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 6d5c8d037b..488df2e00f 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -18,22 +18,37 @@
 
 package org.apache.cassandra.tcm;
 
+ import java.io.IOException;
+ import java.util.Collections;
+ import java.util.Map;
  import java.util.Set;
  import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.TimeUnit;
  import java.util.function.Function;
 
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 
  import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.SystemKeyspace;
+ import org.apache.cassandra.db.commitlog.CommitLog;
+ import org.apache.cassandra.gms.EndpointState;
+ import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.gms.NewGossiper;
  import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.schema.DistributedSchema;
+ import org.apache.cassandra.tcm.compatibility.GossipHelper;
  import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
+ import org.apache.cassandra.tcm.migration.Election;
  import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
  import org.apache.cassandra.tcm.transformations.cms.Initialize;
  import org.apache.cassandra.utils.FBUtilities;
 
  import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
+ import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.emptyWithSchemaFromSystemTables;
+ import static 
org.apache.cassandra.tcm.compatibility.GossipHelper.fromEndpointStates;
 
  public class Startup
  {
@@ -63,6 +78,15 @@ package org.apache.cassandra.tcm;
                  initializeAsNonCmsNode(wrapProcessor);
                  initMessaging.run();
                  break;
+             case VOTE:
+                 logger.info("Initializing for discovery");
+                 initializeAsNonCmsNode(wrapProcessor);
+                 initializeForDiscovery(initMessaging);
+                 break;
+             case UPGRADE:
+                 logger.info("Initializing from gossip");
+                 initializeFromGossip(wrapProcessor, initMessaging);
+                 break;
          }
      }
 
@@ -87,13 +111,91 @@ package org.apache.cassandra.tcm;
      public static void 
initializeAsNonCmsNode(Function<ClusterMetadataService.Processor, 
ClusterMetadataService.Processor> wrapProcessor)
      {
          ClusterMetadata initial = new 
ClusterMetadata(DatabaseDescriptor.getPartitioner());
+         initial.schema.initializeKeyspaceInstances(DistributedSchema.empty());
          ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
                                                                        initial,
                                                                        
wrapProcessor,
                                                                        
ClusterMetadataService::state));
+
+         ClusterMetadataService.instance().initRecentlySealedPeriodsIndex();
          ClusterMetadataService.instance().log().replayPersisted();
      }
 
+     public static void initializeForDiscovery(Runnable initMessaging)
+     {
+         initMessaging.run();
+
+         logger.debug("Discovering other nodes in the system");
+         Discovery.DiscoveredNodes candidates = Discovery.instance.discover();
+
+         if (candidates.kind() == Discovery.DiscoveredNodes.Kind.KNOWN_PEERS)
+         {
+             logger.debug("Got candidates: " + candidates);
+             InetAddressAndPort min = 
candidates.nodes().stream().min(InetAddressAndPort::compareTo).get();
+
+             // identify if you need to start the vote
+             if (min.equals(FBUtilities.getBroadcastAddressAndPort()) || 
FBUtilities.getBroadcastAddressAndPort().compareTo(min) < 0)
+             {
+                 Election.instance.nominateSelf(candidates.nodes(),
+                                                
Collections.singleton(FBUtilities.getBroadcastAddressAndPort()),
+                                                (cm) -> true);
+             }
+         }
+
+         while (!ClusterMetadata.current().epoch.isAfter(Epoch.FIRST))
+         {
+             if (candidates.kind() == Discovery.DiscoveredNodes.Kind.CMS_ONLY)
+             {
+                 ClusterMetadataService.instance().processor().replayAndWait();
+             }
+             else
+             {
+                 Election.Initiator initiator = Election.instance.initiator();
+                 candidates = Discovery.instance.discoverOnce(initiator == 
null ? null : initiator.initiator);
+             }
+             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+         }
+
+         assert ClusterMetadata.current().epoch.isAfter(Epoch.FIRST);
+         Election.instance.migrated();
+     }
+
+     /**
+      * This should only be called during startup.
+      */
+     public static void 
initializeFromGossip(Function<ClusterMetadataService.Processor, 
ClusterMetadataService.Processor> wrapProcessor, Runnable initMessaging)
+     {
+         ClusterMetadata emptyFromSystemTables = 
emptyWithSchemaFromSystemTables();
+         
emptyFromSystemTables.schema.initializeKeyspaceInstances(DistributedSchema.empty());
+         ClusterMetadataService.setInstance(new ClusterMetadataService(new 
UniformRangePlacement(),
+                                                                       
emptyFromSystemTables,
+                                                                       
wrapProcessor,
+                                                                       
ClusterMetadataService::state));
+         initMessaging.run();
+
+         try
+         {
+             CommitLog.instance.recoverSegmentsOnDisk();
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException(e);
+         }
+
+         logger.debug("Starting to initialize ClusterMetadata from gossip");
+         Map<InetAddressAndPort, EndpointState> epStates = 
NewGossiper.instance.doShadowRound();
+         logger.debug("Got epStates {}", epStates);
+         ClusterMetadata initial = 
fromEndpointStates(emptyFromSystemTables.schema, epStates);
+         logger.debug("Created initial ClusterMetadata {}", initial);
+         ClusterMetadataService.instance().setFromGossip(initial);
+         Gossiper.instance.clearUnsafe();
+         
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
+         GossipHelper.mergeAllNodeStatesToGossip(initial);
+         // double check that everything was added, can remove once we are 
confident
+         ClusterMetadata cmGossip = 
fromEndpointStates(emptyFromSystemTables.schema, 
Gossiper.instance.getEndpointStates());
+         assert cmGossip.equals(initial) : cmGossip + " != " + initial;
+     }
+
      /**
       * Initialization process:
       */
@@ -112,10 +214,16 @@ package org.apache.cassandra.tcm;
 
              boolean hasFirstEpoch = SystemKeyspaceStorage.hasFirstEpoch();
              boolean isOnlySeed = DatabaseDescriptor.getSeeds().size() == 1 && 
DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddressAndPort());
-             if (isOnlySeed && !hasFirstEpoch)
-                return FIRST_CMS;
-
-             return NORMAL;
+             boolean hasBootedBefore = SystemKeyspace.getLocalHostId() != null;
+             logger.info("hasFirstEpoch = {}, hasBootedBefore = {}", 
hasFirstEpoch, hasBootedBefore);
+             if (!hasFirstEpoch && hasBootedBefore)
+                 return UPGRADE;
+             else if (hasFirstEpoch)
+                 return NORMAL;
+             else if (isOnlySeed)
+                 return FIRST_CMS;
+             else
+                 return VOTE;
          }
      }
  }
diff --git 
a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java 
b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
new file mode 100644
index 0000000000..ccad52f14a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.listeners;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.virtual.PeersTable;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.compatibility.GossipHelper;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+
+import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
+
+public class LegacyStateListener implements ChangeListener
+{
+    @Override
+    public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next)
+    {
+        if 
(!next.directory.lastModified().equals(prev.directory.lastModified()) ||
+            !next.tokenMap.lastModified().equals(prev.tokenMap.lastModified()))
+        {
+            Set<NodeId> removed = Sets.difference(prev.directory.peerIds(), 
next.directory.peerIds());
+            Set<NodeId> changed = new HashSet<>();
+            for (NodeId node : next.directory.peerIds())
+            {
+                NodeState oldState = prev.directory.peerState(node);
+                NodeState newState = next.directory.peerState(node);
+                if (oldState == null || oldState != newState || 
!prev.tokenMap.tokens(node).equals(next.tokenMap.tokens(node)))
+                    changed.add(node);
+            }
+
+            for (NodeId remove : removed)
+            {
+                GossipHelper.removeFromGossip(prev.directory.endpoint(remove));
+                PeersTable.updateLegacyPeerTable(remove, prev, next);
+            }
+
+            for (NodeId change : changed)
+            {
+                // next.myNodeId() can be null during replay (before we have 
registered)
+                if (next.myNodeId() != null && next.myNodeId().equals(change))
+                {
+                    switch (next.directory.peerState(change))
+                    {
+                        case REGISTERED:
+                            
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
+                            break;
+                        case JOINED:
+                            // needed if we miss the REGISTERED above; Does 
nothing if we are already in epStateMap:
+                            
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
+                            
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+                            
StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
+                                         .filter(cfs -> 
Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
+                                         .forEach(cfs -> 
cfs.indexManager.executePreJoinTasksBlocking(true));
+                            break;
+                    }
+                }
+                if (next.directory.peerState(change) != LEFT)
+                    GossipHelper.mergeNodeToGossip(change, next);
+                else
+                    GossipHelper.mergeNodeToGossip(change, next, 
prev.tokenMap.tokens(change));
+                PeersTable.updateLegacyPeerTable(change, prev, next);
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 1a05a29460..0478500e97 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -40,19 +40,21 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DurationSpec;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.listeners.ChangeListener;
 import org.apache.cassandra.tcm.listeners.InitializationListener;
+import org.apache.cassandra.tcm.listeners.LegacyStateListener;
 import org.apache.cassandra.tcm.listeners.LogListener;
 import org.apache.cassandra.tcm.listeners.MetadataSnapshotListener;
+import org.apache.cassandra.tcm.listeners.PaxosRepairListener;
 import org.apache.cassandra.tcm.listeners.PlacementsChangeListener;
 import org.apache.cassandra.tcm.listeners.SchemaListener;
-import org.apache.cassandra.tcm.listeners.PaxosRepairListener;
-import org.apache.cassandra.tcm.transformations.ForceSnapshot;
 import org.apache.cassandra.tcm.transformations.cms.PreInitialize;
+import org.apache.cassandra.tcm.transformations.ForceSnapshot;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.Closeable;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -96,7 +98,7 @@ public abstract class LocalLog implements Closeable
         Transformation transform = PreInitialize.withFirstCMS(addr);
         append(new Entry(Entry.Id.NONE, FIRST, transform));
         waitForHighestConsecutive();
-        assert metadata().epoch.is(Epoch.FIRST) : 
ClusterMetadata.current().epoch + " " + 
ClusterMetadata.current().fullCMSMembers();
+        assert metadata().epoch.is(Epoch.FIRST) : 
ClusterMetadata.current().epoch + " " + 
ClusterMetadata.current().placements.get(ReplicationParams.meta());
     }
 
     public ClusterMetadata metadata()
@@ -104,7 +106,22 @@ public abstract class LocalLog implements Closeable
         return committed.get();
     }
 
-    @VisibleForTesting
+    public boolean unsafeSetCommittedFromGossip(ClusterMetadata expected, 
ClusterMetadata updated)
+    {
+        if (!(expected.epoch.isEqualOrBefore(Epoch.UPGRADE_GOSSIP) && 
updated.epoch.is(Epoch.UPGRADE_GOSSIP)))
+            throw new IllegalStateException(String.format("Illegal epochs for 
setting from gossip; expected: %s, updated: %s",
+                                                          expected.epoch, 
updated.epoch));
+        return committed.compareAndSet(expected, updated);
+    }
+
+    public void unsafeSetCommittedFromGossip(ClusterMetadata updated)
+    {
+        if (!updated.epoch.is(Epoch.UPGRADE_GOSSIP))
+            throw new IllegalStateException(String.format("Illegal epoch for 
setting from gossip; updated: %s",
+                                                          updated.epoch));
+        committed.set(updated);
+    }
+
     public int pendingBufferSize()
     {
         return pending.size();
@@ -546,6 +563,7 @@ public abstract class LocalLog implements Closeable
         addListener(snapshotListener());
         addListener(new InitializationListener());
         addListener(new SchemaListener());
+        addListener(new LegacyStateListener());
         addListener(new PlacementsChangeListener());
         addListener(new MetadataSnapshotListener());
         addListener(new PaxosRepairListener());
diff --git 
a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java 
b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
new file mode 100644
index 0000000000..8599afec23
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.migration;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+public class ClusterMetadataHolder
+{
+    public static final ClusterMetadataHolder.Serializer serializer = new 
ClusterMetadataHolder.Serializer();
+
+    public final Election.Initiator coordinator;
+    public final ClusterMetadata metadata;
+
+    public ClusterMetadataHolder(Election.Initiator coordinator, 
ClusterMetadata metadata)
+    {
+        this.coordinator = coordinator;
+        this.metadata = metadata;
+    }
+
+    private static class Serializer implements 
IVersionedSerializer<ClusterMetadataHolder>
+    {
+        @Override
+        public void serialize(ClusterMetadataHolder t, DataOutputPlus out, int 
version) throws IOException
+        {
+            Election.Initiator.serializer.serialize(t.coordinator, out, 
version);
+            VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, 
t.metadata, out);
+        }
+
+        @Override
+        public ClusterMetadataHolder deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            Election.Initiator coordinator = 
Election.Initiator.serializer.deserialize(in, version);
+            ClusterMetadata metadata = 
VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in);
+            return new ClusterMetadataHolder(coordinator, metadata);
+        }
+
+        @Override
+        public long serializedSize(ClusterMetadataHolder t, int version)
+        {
+            return Election.Initiator.serializer.serializedSize(t.coordinator, 
version) +
+                   
VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer, 
t.metadata);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 0e8efe8a16..c2ffcdf80b 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -18,28 +18,153 @@
 
 package org.apache.cassandra.tcm.migration;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.Message;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Startup;
+import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.net.MessageDelivery;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.RequestCallbackWithFailure;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDSerializer;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
 public class Election
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Election.class);
+    private static final Initiator MIGRATED = new Initiator(null, null);
+
+    private final AtomicReference<Initiator> initiator = new 
AtomicReference<>();
+
+    public static Election instance = new Election();
+
+    public final PrepareHandler prepareHandler;
+    public final AbortHandler abortHandler;
+
+    private final MessageDelivery messaging;
+
+    private Election()
+    {
+        this(MessagingService.instance());
+    }
+
+    private Election(MessageDelivery messaging)
+    {
+        this.messaging = messaging;
+        this.prepareHandler = new PrepareHandler();
+        this.abortHandler = new AbortHandler();
+    }
+
+    public void nominateSelf(Set<InetAddressAndPort> candidates, 
Set<InetAddressAndPort> ignoredEndpoints, Function<ClusterMetadata, Boolean> 
isMatch)
+    {
+        Set<InetAddressAndPort> sendTo = new HashSet<>(candidates);
+        sendTo.removeAll(ignoredEndpoints);
+        sendTo.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        try
+        {
+            initiate(sendTo, isMatch);
+            finish(sendTo);
+        }
+        catch (Exception e)
+        {
+            abort(sendTo);
+            throw e;
+        }
+    }
+
+    private void initiate(Set<InetAddressAndPort> sendTo, 
Function<ClusterMetadata, Boolean> isMatch)
+    {
+        if (!updateInitiator(null, new 
Initiator(FBUtilities.getBroadcastAddressAndPort(), UUID.randomUUID())))
+            throw new IllegalStateException("Migration already initiated by " 
+ initiator.get());
+
+        Collection<Pair<InetAddressAndPort, ClusterMetadataHolder>> metadatas 
= fanoutAndWait(messaging, sendTo, Verb.TCM_INIT_MIG_REQ, initiator.get());
+        if (metadatas.size() != sendTo.size())
+        {
+            Set<InetAddressAndPort> responded = metadatas.stream().map(p -> 
p.left).collect(Collectors.toSet());
+            String msg = String.format("Did not get response from %s - not 
continuing with migration. Ignore down hosts with --ignore <host>", 
Sets.difference(sendTo, responded));
+            logger.warn(msg);
+            throw new IllegalStateException(msg);
+        }
+
+        Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p -> 
!isMatch.apply(p.right.metadata)).map(p -> p.left).collect(Collectors.toSet());
+        if (!mismatching.isEmpty())
+        {
+            // todo; log the differences between the metadatas
+            String msg = String.format("Got mismatching cluster metadatas from 
%s aborting migration", mismatching);
+            throw new IllegalStateException(msg);
+        }
+    }
+
+    private void finish(Set<InetAddressAndPort> sendTo)
+    {
+        Initiator currentCoordinator = initiator.get();
+        assert 
currentCoordinator.initiator.equals(FBUtilities.getBroadcastAddressAndPort());
+
+        Startup.initializeAsFirstCMSNode();
+        Register.maybeRegister();
+
+        updateInitiator(currentCoordinator, MIGRATED);
+        fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, 
DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY));
+    }
+
+    private void abort(Set<InetAddressAndPort> sendTo)
+    {
+        Initiator init = initiator.getAndSet(null);
+        for (InetAddressAndPort ep : sendTo)
+            messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep);
+    }
+
+    public Initiator initiator()
+    {
+        return initiator.get();
+    }
+
+    public void migrated()
+    {
+        initiator.set(MIGRATED);
+    }
+
+    private boolean updateInitiator(Initiator expected, Initiator 
newCoordinator)
+    {
+        Initiator current = initiator.get();
+        return Objects.equals(current, expected) && 
initiator.compareAndSet(current, newCoordinator);
+    }
+
+    public boolean isMigrating()
+    {
+        Initiator coordinator = initiator();
+        return coordinator != null && coordinator != MIGRATED;
+    }
 
     public static <REQ, RSP> Collection<Pair<InetAddressAndPort, RSP>> 
fanoutAndWait(MessageDelivery messaging, Set<InetAddressAndPort> sendTo, Verb 
verb, REQ payload)
     {
@@ -67,4 +192,89 @@ public class Election
         
cdl.awaitUninterruptibly(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.MILLISECONDS),
 TimeUnit.MILLISECONDS);
         return responses.snapshot();
     }
+
+    public class PrepareHandler implements IVerbHandler<Initiator>
+    {
+        @Override
+        public void doVerb(Message<Initiator> message) throws IOException
+        {
+            if (!updateInitiator(null, message.payload))
+                throw new IllegalStateException(String.format("Got duplicate 
initiate migration message from %s, migration is already started by %s", 
message.from(), initiator()));
+
+            // todo; disallow ANY changes to state managed in ClusterMetadata
+            messaging.send(message.responseWith(new 
ClusterMetadataHolder(message.payload, ClusterMetadata.current())), 
message.from());
+        }
+    }
+
+    public class AbortHandler implements IVerbHandler<Initiator>
+    {
+        @Override
+        public void doVerb(Message<Initiator> message) throws IOException
+        {
+            if (!message.from().equals(initiator().initiator) || 
!updateInitiator(message.payload, null))
+                logger.error("Could not clear initiator - initiator is set to 
{}, abort message received from {}", initiator(), message.payload);
+        }
+    }
+
+    public static class Initiator
+    {
+        public static final Serializer serializer = new Serializer();
+
+        public final InetAddressAndPort initiator;
+        public final UUID initToken;
+
+        public Initiator(InetAddressAndPort initiator, UUID initToken)
+        {
+            this.initiator = initiator;
+            this.initToken = initToken;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Initiator)) return false;
+            Initiator other = (Initiator) o;
+            return Objects.equals(initiator, other.initiator) && 
Objects.equals(initToken, other.initToken);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(initiator, initToken);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Initiator{" +
+                   "initiator=" + initiator +
+                   ", initToken=" + initToken +
+                   '}';
+        }
+
+        public static class Serializer implements 
IVersionedSerializer<Initiator>
+        {
+            @Override
+            public void serialize(Initiator t, DataOutputPlus out, int 
version) throws IOException
+            {
+                
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator,
 out, version);
+                UUIDSerializer.serializer.serialize(t.initToken, out, version);
+            }
+
+            @Override
+            public Initiator deserialize(DataInputPlus in, int version) throws 
IOException
+            {
+                return new 
Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in,
 version),
+                                     UUIDSerializer.serializer.deserialize(in, 
version));
+            }
+
+            @Override
+            public long serializedSize(Initiator t, int version)
+            {
+                return 
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator,
 version) +
+                       UUIDSerializer.serializer.serializedSize(t.initToken, 
version);
+            }
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
index bf58689860..bbc03f2e21 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java
@@ -24,8 +24,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -64,8 +65,8 @@ public class GossipCMSListener implements 
IEndpointStateChangeSubscriber
                 ClusterMetadata newCM = metadata.transformer()
                                                 .withNodeInformation(nodeId, 
newNodeVersion, metadata.directory.getNodeAddresses(nodeId))
                                                 .buildForGossipMode();
-//                if 
(ClusterMetadataService.instance().applyFromGossip(metadata, newCM))
-//                    return;
+                if 
(ClusterMetadataService.instance().applyFromGossip(metadata, newCM))
+                    return;
                 metadata = ClusterMetadata.current();
             }
         }
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
new file mode 100644
index 0000000000..71bb0424d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.migration;
+
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Commit;
+import org.apache.cassandra.tcm.log.Entry;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+public class GossipProcessor implements ClusterMetadataService.Processor
+{
+    @Override
+    public Commit.Result commit(Entry.Id entryId, Transformation transform, 
Epoch lastKnown)
+    {
+        throw new IllegalStateException("Can't commit transformations when 
running in gossip mode. Enable the ClusterMetadataService with `nodetool 
addtocms`.");
+    }
+
+    @Override
+    public ClusterMetadata replayAndWait()
+    {
+        return ClusterMetadata.current();
+    }
+}
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 35a673c406..0eb592dc60 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -94,6 +94,7 @@ public class NodeTool
     public int execute(String... args)
     {
         List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList(
+                AddToCMS.class,
                 Assassinate.class,
                 CassHelp.class,
                 CfHistograms.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java 
b/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java
new file mode 100644
index 0000000000..bf9e5d1cab
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/AddToCMS.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+@Command(name = "addtocms", description = "Add this node to the cluster 
metadata service")
+public class AddToCMS extends NodeTool.NodeToolCmd
+{
+    @Option(title = "ignored endpoints", name = {"-i", "--ignore"}, 
description = "Hosts to ignore due to them being down", required = false)
+    private List<String> endpoint = new ArrayList<>();
+    @Override
+    protected void execute(NodeProbe probe)
+    {
+        probe.getStorageService().addToCms(endpoint);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to