This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit c232bcb57a664a0a4ca379f03dea6d54023738a2
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 2 14:00:55 2023 +0000

    [CEP-21] Start to remove and deprecate gossip functionality
    
    WIP commit (i.e. does not compile) beginning the process of removing
    gossip as the source of truth regarding membership, ownership, topology
    and data placement. This task will be split over mutiple commits.
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../statements/schema/AlterKeyspaceStatement.java  |  24 +-
 .../apache/cassandra/db/filter/ColumnFilter.java   |  98 +--
 .../org/apache/cassandra/gms/ApplicationState.java |  38 +-
 .../org/apache/cassandra/gms/EndpointState.java    |  27 +-
 .../org/apache/cassandra/gms/FailureDetector.java  |  12 +-
 .../org/apache/cassandra/gms/GossipDigestAck.java  |   2 +-
 .../cassandra/gms/GossipDigestAckVerbHandler.java  |  11 +-
 .../cassandra/gms/GossipDigestSynVerbHandler.java  |   5 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 728 +++++----------------
 .../org/apache/cassandra/gms/GossiperEvent.java    |   2 +-
 src/java/org/apache/cassandra/gms/NewGossiper.java | 143 ++++
 .../org/apache/cassandra/gms/VersionedValue.java   |  96 ++-
 .../schema/SystemDistributedKeyspace.java          |  45 +-
 .../cassandra/tcm/compatibility/GossipHelper.java  | 303 +++++++++
 .../apache/cassandra/tracing/TraceKeyspace.java    |  14 +-
 15 files changed, 774 insertions(+), 774 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
index 9efccd5464..250b18ffed 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.Keyspaces.KeyspacesDiff;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.transport.Event.SchemaChange;
 import org.apache.cassandra.transport.Event.SchemaChange.Change;
 import org.apache.cassandra.utils.FBUtilities;
@@ -91,7 +92,7 @@ public final class AlterKeyspaceStatement extends 
AlterSchemaStatement
 
         newKeyspace.params.validate(keyspaceName, state);
         newKeyspace.replicationStrategy.validate();
-
+        validateNoRangeMovements();
         validateTransientReplication(keyspace.replicationStrategy, 
newKeyspace.replicationStrategy);
 
         // Because we used to not properly validate unrecognized options, we 
only log a warning if we find one.
@@ -140,14 +141,19 @@ public final class AlterKeyspaceStatement extends 
AlterSchemaStatement
         if (allow_alter_rf_during_range_movement)
             return;
 
-        Stream<InetAddressAndPort> unreachableNotAdministrativelyInactive =
-            Gossiper.instance.getUnreachableMembers().stream().filter(endpoint 
-> !FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
-                                                                               
   !Gossiper.instance.isAdministrativelyInactiveState(endpoint));
-        Stream<InetAddressAndPort> endpoints = 
Stream.concat(Gossiper.instance.getLiveMembers().stream(),
-                                                             
unreachableNotAdministrativelyInactive);
-        List<InetAddressAndPort> notNormalEndpoints = 
endpoints.filter(endpoint -> 
!FBUtilities.getBroadcastAddressAndPort().equals(endpoint) &&
-                                                                               
    !Gossiper.instance.getEndpointStateForEndpoint(endpoint).isNormalState())
-                                                               
.collect(Collectors.toList());
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId nodeId = 
metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort());
+        Set<InetAddressAndPort> notNormalEndpoints = 
metadata.directory.states.entrySet().stream().filter(e -> 
!e.getKey().equals(nodeId)).filter(e -> {
+            switch (e.getValue())
+            {
+                case BOOTSTRAPPING:
+                case LEAVING:
+                case MOVING:
+                    return true;
+                default:
+                    return false;
+            }
+        }).map(e -> 
metadata.directory.endpoint(e.getKey())).collect(Collectors.toSet());
         if (!notNormalEndpoints.isEmpty())
         {
             throw new ConfigurationException("Cannot alter RF while some 
endpoints are not in normal state (no range movements): " + notNormalEndpoints);
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 48ba7388c7..4c2774a51b 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -31,13 +31,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.CassandraVersion;
 
 /**
  * Represents which (non-PK) columns (and optionally which sub-part of a 
column for complex columns) are selected
@@ -182,67 +180,6 @@ public abstract class ColumnFilter
         abstract RegularAndStaticColumns getFetchedColumns(TableMetadata 
metadata, RegularAndStaticColumns queried);
     }
 
-    /**
-     * Returns {@code true} if there are pre-4.0-rc2 nodes in the cluster, 
{@code false} otherwise.
-     *
-     * <p>ColumnFilters from 4.0 releases before RC2 wrongly assumed that 
fetching all regular columns and not
-     * the static columns was enough. That was not the case for queries that 
needed to return rows for empty partitions.
-     * See CASSANDRA-16686 for more details.</p>
-     */
-    private static boolean isUpgradingFromVersionLowerThan40RC2()
-    {
-        if 
(Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0_RC2))
-        {
-            logger.trace("ColumnFilter conversion has been applied so that 
static columns will not be fetched because there are pre 4.0-rc2 nodes in the 
cluster");
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Returns {@code true} if there are pre-4.0 nodes in the cluster, {@code 
false} otherwise.
-     *
-     * <p>If there pre-4.0 nodes in the cluster all static columns should be 
fetched along with all regular columns.
-     * This is due to the fact that this nodes have a different understanding 
of the fetchAll serialization flag.
-     * Pre-4.0 the fetchAll flag meant that all the columns regular AND STATIC 
should be fetched whereas for 4.0
-     * nodes it meant that only the regular columns and the queried static 
columns should be fetched.</p>
-     */
-    private static boolean isUpgradingFromVersionLowerThan40()
-    {
-        if 
(Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0))
-        {
-            logger.trace("ColumnFilter conversion has been applied so that all 
static columns will be fetched because there are pre 4.0 nodes in the cluster");
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Returns {@code true} if there are pre-3.4 nodes in the cluster, {@code 
false} otherwise.
-     *
-     * When fetchAll is enabled on pre CASSANDRA-10657 (3.4-), queried columns 
are not considered at all, and it
-     * is assumed that all columns are queried. CASSANDRA-10657 (3.4+) brings 
back skipping values of columns
-     * which are not in queried set when fetchAll is enabled. That makes 
exactly the same filter being
-     * interpreted in a different way on 3.4- and 3.4+.
-     *
-     * Moreover, there is no way to convert the filter with fetchAll and 
queried != null so that it is
-     * interpreted the same way on 3.4- because that Cassandra version does 
not support such filtering.
-     *
-     * In order to avoid inconsistencies in data read by 3.4- and 3.4+ we need 
to avoid creation of incompatible
-     * filters when the cluster contains 3.4- nodes. We need to do that by 
using a wildcard query.
-     *
-     * see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
-     */
-    private static boolean isUpgradingFromVersionLowerThan34()
-    {
-        if 
(Gossiper.instance.isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_3_4))
-        {
-            logger.trace("ColumnFilter conversion has been applied so that all 
columns will be queried because there are pre 3.4 nodes in the cluster");
-            return true;
-        }
-        return false;
-    }
-
     /**
      * A filter that includes all columns for the provided table.
      */
@@ -271,22 +208,9 @@ public abstract class ColumnFilter
                                          RegularAndStaticColumns queried,
                                          boolean 
returnStaticContentOnPartitionWithNoRows)
     {
-        // pre CASSANDRA-10657 (3.4-), when fetchAll is enabled, queried 
columns are not considered at all, and it
-        // is assumed that all columns are queried.
-        if (isUpgradingFromVersionLowerThan34())
-        {
-            return new 
WildCardColumnFilter(metadata.regularAndStaticColumns());
-        }
-
-        // pre CASSANDRA-12768 (4.0-) all static columns should be fetched 
along with all regular columns.
-        if (isUpgradingFromVersionLowerThan40())
-        {
-            return 
SelectionColumnFilter.newInstance(FetchingStrategy.ALL_COLUMNS, metadata, 
queried, null);
-        }
-
         // pre CASSANDRA-16686 (4.0-RC2-) static columns were not fetched 
unless queried which led to some wrong
         // results for some queries
-        if (!returnStaticContentOnPartitionWithNoRows || 
isUpgradingFromVersionLowerThan40RC2())
+        if (!returnStaticContentOnPartitionWithNoRows)
         {
             return 
SelectionColumnFilter.newInstance(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS,
 metadata, queried, null);
         }
@@ -571,20 +495,14 @@ public abstract class ColumnFilter
                 // filters when the cluster contains 3.4- nodes. We do that by 
forcibly setting queried to null.
                 //
                 // see CASSANDRA-10657, CASSANDRA-15833, CASSANDRA-16415
-                if (queried == null || isUpgradingFromVersionLowerThan34())
+                if (queried == null)
                 {
                     return new 
WildCardColumnFilter(metadata.regularAndStaticColumns());
                 }
 
-                // pre CASSANDRA-12768 (4.0-) all static columns should be 
fetched along with all regular columns.
-                if (isUpgradingFromVersionLowerThan40())
-                {
-                    return 
SelectionColumnFilter.newInstance(FetchingStrategy.ALL_COLUMNS, metadata, 
queried, s);
-                }
-
                 // pre CASSANDRA-16686 (4.0-RC2-) static columns where not 
fetched unless queried witch lead to some wrong results
                 // for some queries
-                if (!returnStaticContentOnPartitionWithNoRows || 
isUpgradingFromVersionLowerThan40RC2())
+                if (!returnStaticContentOnPartitionWithNoRows)
                 {
                     return 
SelectionColumnFilter.newInstance(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS,
 metadata, queried, s);
                 }
@@ -1041,20 +959,14 @@ public abstract class ColumnFilter
             {
                 // pre CASSANDRA-10657 (3.4-), when fetchAll is enabled, 
queried columns are not considered at all, and it
                 // is assumed that all columns are queried.
-                if (!hasQueried || isUpgradingFromVersionLowerThan34())
+                if (!hasQueried)
                 {
                     return new WildCardColumnFilter(fetched);
                 }
 
-                // pre CASSANDRA-12768 (4.0-) all static columns should be 
fetched along with all regular columns.
-                if (isUpgradingFromVersionLowerThan40())
-                {
-                    return new 
SelectionColumnFilter(FetchingStrategy.ALL_COLUMNS, queried, fetched, 
subSelections);
-                }
-
                 // pre CASSANDRA-16686 (4.0-RC2-) static columns where not 
fetched unless queried witch lead to some wrong results
                 // for some queries
-                if (!isFetchAllStatics || 
isUpgradingFromVersionLowerThan40RC2())
+                if (!isFetchAllStatics)
                 {
                     return new 
SelectionColumnFilter(FetchingStrategy.ALL_REGULARS_AND_QUERIED_STATICS_COLUMNS,
 queried, fetched, subSelections);
                 }
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java 
b/src/java/org/apache/cassandra/gms/ApplicationState.java
index c45d3c2602..4e07eacff2 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -30,22 +30,22 @@ public enum ApplicationState
     // never remove a state here, ordering matters.
     @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 
5.0, reclaim in 6.0
     LOAD,
-    SCHEMA,
-    DC,
-    RACK,
-    RELEASE_VERSION,
-    REMOVAL_COORDINATOR,
-    @Deprecated INTERNAL_IP, //Deprecated and unused in 4.0, stop publishing 
in 5.0, reclaim in 6.0
-    @Deprecated RPC_ADDRESS, // ^ Same
+    @Deprecated SCHEMA(false),
+    @Deprecated DC(true),
+    @Deprecated RACK(true),
+    @Deprecated RELEASE_VERSION(true),
+    @Deprecated REMOVAL_COORDINATOR,
+    @Deprecated INTERNAL_IP(true), //Deprecated and unused in 4.0, stop 
publishing in 5.0, reclaim in 6.0
+    @Deprecated RPC_ADDRESS(true), // ^ Same
     X_11_PADDING, // padding specifically for 1.1
     SEVERITY,
-    NET_VERSION,
-    HOST_ID,
-    TOKENS,
+    NET_VERSION(true),
+    @Deprecated HOST_ID(true),
+    @Deprecated TOKENS(true),
     RPC_READY,
     // pad to allow adding new states to existing cluster
-    INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with up to two 
ports
-    NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
+    @Deprecated INTERNAL_ADDRESS_AND_PORT, //Replacement for INTERNAL_IP with 
up to two ports
+    @Deprecated NATIVE_ADDRESS_AND_PORT, //Replacement for RPC_ADDRESS
     STATUS_WITH_PORT, //Replacement for STATUS
     /**
      * The set of sstable versions on this node. This will usually be only the 
"current" sstable format (the one with
@@ -67,5 +67,17 @@ public enum ApplicationState
     X7,
     X8,
     X9,
-    X10,
+    X10;
+
+    public final boolean derivedFromState;
+
+    ApplicationState()
+    {
+        this(false);
+    }
+
+    ApplicationState(boolean derivedFromState )
+    {
+        this.derivedFromState = derivedFromState;
+    }
 }
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 0886bde156..dd912dcc20 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.NullableSerializer;
 
@@ -69,10 +70,11 @@ public class EndpointState
         this(new HeartBeatState(other.hbState), new 
EnumMap<>(other.applicationState.get()));
     }
 
-    EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
+    @VisibleForTesting
+    public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
     {
         hbState = initialHbState;
-        applicationState = new AtomicReference<Map<ApplicationState, 
VersionedValue>>(new EnumMap<>(states));
+        applicationState = new AtomicReference<>(new EnumMap<>(states));
         updateTimestamp = nanoTime();
         isAlive = true;
     }
@@ -83,6 +85,17 @@ public class EndpointState
         return hbState;
     }
 
+    public EndpointState nonDerivable()
+    {
+        Map<ApplicationState, VersionedValue> state = new 
EnumMap<ApplicationState, VersionedValue>(ApplicationState.class);
+        for (Map.Entry<ApplicationState, VersionedValue> e : 
applicationState.get().entrySet())
+        {
+            if (!e.getKey().derivedFromState)
+                state.put(e.getKey(), e.getValue());
+        }
+        return new EndpointState(hbState, state);
+    }
+
     void setHeartBeatState(HeartBeatState newHbState)
     {
         updateTimestamp();
@@ -232,11 +245,6 @@ public class EndpointState
         return rpcState != null && Boolean.parseBoolean(rpcState.value);
     }
 
-    public boolean isNormalState()
-    {
-        return getStatus().equals(VersionedValue.STATUS_NORMAL);
-    }
-
     public String getStatus()
     {
         VersionedValue status = 
getApplicationState(ApplicationState.STATUS_WITH_PORT);
@@ -257,10 +265,7 @@ public class EndpointState
     @Nullable
     public UUID getSchemaVersion()
     {
-        VersionedValue applicationState = 
getApplicationState(ApplicationState.SCHEMA);
-        return applicationState != null
-               ? UUID.fromString(applicationState.value)
-               : null;
+        return ClusterMetadata.current().schema.getVersion();
     }
 
     @Nullable
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java 
b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 5177154df9..51d73cc067 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import javax.management.openmbean.*;
 
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
@@ -243,10 +246,13 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
                 continue;
             sb.append("  
").append(state.getKey()).append(":").append(state.getValue().version).append(":").append(state.getValue().value).append("\n");
         }
-        VersionedValue tokens = 
endpointState.getApplicationState(ApplicationState.TOKENS);
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId nodeId = 
metadata.directory.peerId(FBUtilities.getBroadcastAddressAndPort());
+        List<Token> tokens = metadata.tokenMap.tokens(nodeId);
         if (tokens != null)
         {
-            sb.append("  
TOKENS:").append(tokens.version).append(":<hidden>\n");
+            // todo, used to only append tokens.version
+            sb.append("  
TOKENS:").append(metadata.epoch.toString()).append("\n");
         }
         else
         {
@@ -294,7 +300,7 @@ public class FailureDetector implements IFailureDetector, 
FailureDetectorMBean
         // it's worth being defensive here so minor bugs don't cause 
disproportionate
         // badness.  (See CASSANDRA-1463 for an example).
         if (epState == null)
-            logger.error("Unknown endpoint: " + ep, new 
IllegalArgumentException(""));
+            logger.error("Unknown endpoint: " + ep, new 
IllegalArgumentException("Unknown endpoint: " + ep));
         return epState != null && epState.isAlive();
     }
 
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAck.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
index 26494eaba9..07feab29d7 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAck.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAck.java
@@ -41,7 +41,7 @@ public class GossipDigestAck
     final List<GossipDigest> gDigestList;
     final Map<InetAddressAndPort, EndpointState> epStateMap;
 
-    GossipDigestAck(List<GossipDigest> gDigestList, Map<InetAddressAndPort, 
EndpointState> epStateMap)
+    public GossipDigestAck(List<GossipDigest> gDigestList, 
Map<InetAddressAndPort, EndpointState> epStateMap)
     {
         this.gDigestList = gDigestList;
         this.epStateMap = epStateMap;
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 5fbe7ce0e3..084d41ed89 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -42,7 +42,7 @@ public class GossipDigestAckVerbHandler extends 
GossipVerbHandler<GossipDigestAc
         InetAddressAndPort from = message.from();
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestAckMessage from {}", from);
-        if (!Gossiper.instance.isEnabled() && 
!Gossiper.instance.isInShadowRound())
+        if (!Gossiper.instance.isEnabled() && 
!NewGossiper.instance.isInShadowRound())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Ignoring GossipDigestAckMessage because gossip 
is disabled");
@@ -53,17 +53,14 @@ public class GossipDigestAckVerbHandler extends 
GossipVerbHandler<GossipDigestAc
         List<GossipDigest> gDigestList = 
gDigestAckMessage.getGossipDigestList();
         Map<InetAddressAndPort, EndpointState> epStateMap = 
gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", 
gDigestList.size(), epStateMap.size());
-
-        if (Gossiper.instance.isInShadowRound())
+        if (NewGossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Received an ack from {}, which may trigger exit 
from shadow round", from);
 
-            // if the ack is completely empty, then we can infer that the 
respondent is also in a shadow round
-            Gossiper.instance.maybeFinishShadowRound(from, 
gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
-            return; // don't bother doing anything else, we have what we came 
for
+            NewGossiper.instance.onAck(epStateMap);
+            return;
         }
-
         if (epStateMap.size() > 0)
         {
             // Ignore any GossipDigestAck messages that we handle before a 
regular GossipDigestSyn has been send.
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index abaa39b14c..c2713863cc 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -44,7 +44,7 @@ public class GossipDigestSynVerbHandler extends 
GossipVerbHandler<GossipDigestSy
         InetAddressAndPort from = message.from();
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestSynMessage from {}", from);
-        if (!Gossiper.instance.isEnabled() && 
!Gossiper.instance.isInShadowRound())
+        if (!Gossiper.instance.isEnabled() && 
!NewGossiper.instance.isInShadowRound())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Ignoring GossipDigestSynMessage because gossip 
is disabled");
@@ -66,13 +66,12 @@ public class GossipDigestSynVerbHandler extends 
GossipVerbHandler<GossipDigestSy
         }
 
         List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
-
         // if the syn comes from a peer performing a shadow round and this 
node is
         // also currently in a shadow round, send back a minimal ack. This 
node must
         // be in the sender's seed list and doing this allows the sender to
         // differentiate between seeds from which it is partitioned and those 
which
         // are in their shadow round
-        if (!Gossiper.instance.isEnabled() && 
Gossiper.instance.isInShadowRound())
+        if (!Gossiper.instance.isEnabled() && 
NewGossiper.instance.isInShadowRound())
         {
             // a genuine syn (as opposed to one from a node currently
             // doing a shadow round) will always contain > 0 digests
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 0f13f6ae48..86ccee939e 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -18,8 +18,22 @@
 package org.apache.cassandra.gms;
 
 import java.net.UnknownHostException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -30,47 +44,46 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BooleanSupplier;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
-
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.concurrent.FutureTask;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.NoPayload;
-import org.apache.cassandra.net.Verb;
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.ExecutorUtils;
-import org.apache.cassandra.utils.ExpiringMemoizingSupplier;
-import org.apache.cassandra.utils.MBeanWrapper;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.FutureTask;
+import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.RecomputingSupplier;
-import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 import org.apache.cassandra.utils.concurrent.NotScheduledFuture;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.DISABLE_GOSSIP_ENDPOINT_REMOVAL;
@@ -79,13 +92,15 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_S
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.SHUTDOWN_ANNOUNCE_DELAY_IN_MS;
 import static org.apache.cassandra.config.DatabaseDescriptor.getClusterName;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName;
+import static org.apache.cassandra.gms.Gossiper.GossipedWith.CMS;
+import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED;
+import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.ECHO_REQ;
 import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
-import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
-import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS;
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 /**
  * This module is responsible for Gossiping information for the local 
endpoint. This abstraction
@@ -120,12 +135,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     static
     {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
-        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
-        
SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
     }
-    private static final List<String> ADMINISTRATIVELY_INACTIVE_STATES = 
Arrays.asList(VersionedValue.HIBERNATE,
-                                                                               
        VersionedValue.REMOVED_TOKEN,
-                                                                               
        VersionedValue.STATUS_LEFT);
+
     private volatile ScheduledFuture<?> scheduledGossipTask;
     private static final ReentrantLock taskLock = new ReentrantLock();
     public final static int intervalInMillis = 1000;
@@ -179,15 +190,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     private volatile long lastProcessedMessageAt = currentTimeMillis();
 
-    /**
-     * This property is initially set to {@code true} which means that we have 
no information about the other nodes.
-     * Once all nodes are on at least this node version, it becomes {@code 
false}, which means that we are not
-     * upgrading from the previous version (major, minor).
-     *
-     * This property and anything that checks it should be removed in 5.0
-     */
-    private volatile boolean upgradeInProgressPossible = true;
-
     public void clearUnsafe()
     {
         unreachableEndpoints.clear();
@@ -199,64 +201,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         seedsInShadowRound.clear();
     }
 
-    // returns true when the node does not know the existence of other nodes.
-    private static boolean isLoneNode(Map<InetAddressAndPort, EndpointState> 
epStates)
-    {
-        return epStates.isEmpty() || 
epStates.keySet().equals(Collections.singleton(FBUtilities.getBroadcastAddressAndPort()));
-    }
-
-    private static final ExpiringMemoizingSupplier.Memoized<CassandraVersion> 
NO_UPGRADE_IN_PROGRESS = new ExpiringMemoizingSupplier.Memoized<>(null);
-    private static final 
ExpiringMemoizingSupplier.NotMemoized<CassandraVersion> CURRENT_NODE_VERSION = 
new ExpiringMemoizingSupplier.NotMemoized<>(SystemKeyspace.CURRENT_VERSION);
-    final Supplier<ExpiringMemoizingSupplier.ReturnValue<CassandraVersion>> 
upgradeFromVersionSupplier = () ->
-    {
-        // Once there are no prior version nodes we don't need to keep 
rechecking
-        if (!upgradeInProgressPossible)
-            return NO_UPGRADE_IN_PROGRESS;
-
-        CassandraVersion minVersion = SystemKeyspace.CURRENT_VERSION;
-
-        // Skip the round if the gossiper has not started yet
-        // Otherwise, upgradeInProgressPossible can be set to false wrongly.
-        // If we don't know any epstate we don't know anything about the 
cluster.
-        // If we only know about ourselves, we can assume that version is 
CURRENT_VERSION
-        if (!isEnabled() || isLoneNode(endpointStateMap))
-            return CURRENT_NODE_VERSION;
-
-        // Check the release version of all the peers it heard of. Not 
necessary the peer that it has/had contacted with.
-        boolean allHostsHaveKnownVersion = true;
-        for (InetAddressAndPort host : endpointStateMap.keySet())
-        {
-            if (justRemovedEndpoints.containsKey(host))
-                continue;
-
-            CassandraVersion version = getReleaseVersion(host);
-
-            //Raced with changes to gossip state, wait until next iteration
-            if (version == null)
-                allHostsHaveKnownVersion = false;
-            else if (version.compareTo(minVersion) < 0)
-                minVersion = version;
-        }
-
-        if (minVersion.compareTo(SystemKeyspace.CURRENT_VERSION) < 0)
-            return new ExpiringMemoizingSupplier.Memoized<>(minVersion);
-
-        if (!allHostsHaveKnownVersion)
-            return new ExpiringMemoizingSupplier.NotMemoized<>(minVersion);
-
-        upgradeInProgressPossible = false;
-        return NO_UPGRADE_IN_PROGRESS;
-    };
-
-    private final Supplier<CassandraVersion> upgradeFromVersionMemoized = 
ExpiringMemoizingSupplier.memoizeWithExpiration(upgradeFromVersionSupplier, 1, 
TimeUnit.MINUTES);
-
-    @VisibleForTesting
-    public void expireUpgradeFromVersion()
-    {
-        upgradeInProgressPossible = true;
-        ((ExpiringMemoizingSupplier<CassandraVersion>) 
upgradeFromVersionMemoized).expire();
-    }
-
     private static final boolean disableThreadValidation = 
Boolean.getBoolean(Props.DISABLE_THREAD_VALIDATION);
     private static volatile boolean disableEndpointRemoval = 
DISABLE_GOSSIP_ENDPOINT_REMOVAL.getBoolean();
 
@@ -292,6 +236,11 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
+    public Map<InetAddressAndPort, EndpointState> getEndpointStates()
+    {
+        return endpointStateMap;
+    }
+
     private class GossipTask implements Runnable
     {
         public void run()
@@ -318,7 +267,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                                                                            
gDigests);
                     Message<GossipDigestSyn> message = 
Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
                     /* Gossip to some random live member */
-                    boolean gossipedToSeed = doGossipToLiveMember(message);
+                    EnumSet<GossipedWith> gossipedWith = 
doGossipToLiveMember(message);
 
                     /* Gossip to some unreachable member with some probability 
to check if he is back up */
                     maybeGossipToUnreachableMember(message);
@@ -339,8 +288,11 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                        gossipedToSeed check.
 
                        See CASSANDRA-150 for more exposition. */
-                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
-                        maybeGossipToSeed(message);
+                    if (!gossipedWith.contains(SEED) || liveEndpoints.size() < 
seeds.size())
+                        gossipedWith.addAll(maybeGossipToSeed(message));
+
+                    if (!gossipedWith.contains(CMS))
+                        maybeGossipToCMS(message);
 
                     doStatusCheck();
                 }
@@ -376,17 +328,17 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         subscribers.add(new IEndpointStateChangeSubscriber()
         {
             public void onJoin(InetAddressAndPort endpoint, EndpointState 
state)
-           {
+            {
                 maybeRecompute(state);
             }
 
             public void onAlive(InetAddressAndPort endpoint, EndpointState 
state)
-           {
+            {
                 maybeRecompute(state);
             }
 
             private void maybeRecompute(EndpointState state)
-           {
+            {
                 if 
(state.getApplicationState(ApplicationState.RELEASE_VERSION) != null)
                     minVersionSupplier.recompute();
             }
@@ -404,36 +356,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         this.lastProcessedMessageAt = timeInMillis;
     }
 
-    public boolean seenAnySeed()
-    {
-        for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
endpointStateMap.entrySet())
-        {
-            if (seeds.contains(entry.getKey()))
-                return true;
-            try
-            {
-                VersionedValue internalIp = 
entry.getValue().getApplicationState(ApplicationState.INTERNAL_IP);
-                VersionedValue internalIpAndPort = 
entry.getValue().getApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT);
-                InetAddressAndPort endpoint = null;
-                if (internalIpAndPort != null)
-                {
-                    endpoint = 
InetAddressAndPort.getByName(internalIpAndPort.value);
-                }
-                else if (internalIp != null)
-                {
-                    endpoint = InetAddressAndPort.getByName(internalIp.value);
-                }
-                if (endpoint != null && seeds.contains(endpoint))
-                    return true;
-            }
-            catch (UnknownHostException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        return false;
-    }
-
     /**
      * Register for interesting state changes.
      *
@@ -487,12 +409,19 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public Set<InetAddressAndPort> getUnreachableTokenOwners()
     {
         Set<InetAddressAndPort> tokenOwners = new HashSet<>();
+        ClusterMetadata metadata = ClusterMetadata.current();
         for (InetAddressAndPort endpoint : unreachableEndpoints.keySet())
         {
-            if (StorageService.instance.getTokenMetadata().isMember(endpoint))
-                tokenOwners.add(endpoint);
+            NodeId nodeId = metadata.directory.peerId(endpoint);
+            NodeState state = metadata.directory.peerState(nodeId);
+            switch (state)
+            {
+                case JOINED:
+                case MOVING:
+                case LEAVING:
+                    tokenOwners.add(endpoint);
+            }
         }
-
         return tokenOwners;
     }
 
@@ -538,7 +467,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             runnable.run();
             return;
         }
-
         FutureTask<?> task = new FutureTask<>(runnable);
         Stage.GOSSIP.execute(task);
         try
@@ -706,32 +634,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         GossiperDiagnostics.quarantinedEndpoint(this, endpoint, 
quarantineExpiration);
     }
 
-    /**
-     * Quarantine endpoint specifically for replacement purposes.
-     * @param endpoint
-     */
-    public void replacementQuarantine(InetAddressAndPort endpoint)
-    {
-        // remember, quarantineEndpoint will effectively already add 
QUARANTINE_DELAY, so this is 2x
-        quarantineEndpoint(endpoint, currentTimeMillis() + QUARANTINE_DELAY);
-        GossiperDiagnostics.replacementQuarantine(this, endpoint);
-    }
-
-    /**
-     * Remove the Endpoint and evict immediately, to avoid gossiping about 
this node.
-     * This should only be called when a token is taken over by a new IP 
address.
-     *
-     * @param endpoint The endpoint that has been replaced
-     */
-    public void replacedEndpoint(InetAddressAndPort endpoint)
-    {
-        checkProperThreadForStateMutation();
-        removeEndpoint(endpoint);
-        evictFromMembership(endpoint);
-        replacementQuarantine(endpoint);
-        GossiperDiagnostics.replacedEndpoint(this, endpoint);
-    }
-
     /**
      * The gossip digest is built based on randomization
      * rather than just looping through the collection of live endpoints.
@@ -770,38 +672,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    /**
-     * This method will begin removing an existing endpoint from the cluster 
by spoofing its state
-     * This should never be called unless this coordinator has had 
'removenode' invoked
-     *
-     * @param endpoint    - the endpoint being removed
-     * @param hostId      - the ID of the host being removed
-     * @param localHostId - my own host ID for replication coordination
-     */
-    public void advertiseRemoving(InetAddressAndPort endpoint, UUID hostId, 
UUID localHostId)
-    {
-        EndpointState epState = endpointStateMap.get(endpoint);
-        // remember this node's generation
-        int generation = epState.getHeartBeatState().getGeneration();
-        logger.info("Removing host: {}", hostId);
-        logger.info("Sleeping for {}ms to ensure {} does not change", 
StorageService.RING_DELAY_MILLIS, endpoint);
-        
Uninterruptibles.sleepUninterruptibly(StorageService.RING_DELAY_MILLIS, 
TimeUnit.MILLISECONDS);
-        // make sure it did not change
-        epState = endpointStateMap.get(endpoint);
-        if (epState.getHeartBeatState().getGeneration() != generation)
-            throw new RuntimeException("Endpoint " + endpoint + " generation 
changed while trying to remove it");
-        // update the other node's generation to mimic it as if it had changed 
it itself
-        logger.info("Advertising removal for {}", endpoint);
-        epState.updateTimestamp(); // make sure we don't evict it too soon
-        epState.getHeartBeatState().forceNewerGenerationUnsafe();
-        Map<ApplicationState, VersionedValue> states = new 
EnumMap<>(ApplicationState.class);
-        states.put(ApplicationState.STATUS_WITH_PORT, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
-        states.put(ApplicationState.STATUS, 
StorageService.instance.valueFactory.removingNonlocal(hostId));
-        states.put(ApplicationState.REMOVAL_COORDINATOR, 
StorageService.instance.valueFactory.removalCoordinator(localHostId));
-        epState.addApplicationStates(states);
-        endpointStateMap.put(endpoint, epState);
-    }
-
     /**
      * Handles switching the endpoint's state from REMOVING_TOKEN to 
REMOVED_TOKEN
      * This should only be called after advertiseRemoving
@@ -821,6 +691,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         addExpireTimeForEndpoint(endpoint, expireTime);
         endpointStateMap.put(endpoint, epState);
         // ensure at least one gossip round occurs before returning
+        // todo; do we need this?
         Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, 
TimeUnit.MILLISECONDS);
     }
 
@@ -868,9 +739,11 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             }
 
             Collection<Token> tokens = null;
+            ClusterMetadata metadata = ClusterMetadata.current();
             try
             {
-                tokens = 
StorageService.instance.getTokenMetadata().getTokens(endpoint);
+                NodeId nodeId = metadata.directory.peerId(endpoint);
+                tokens = metadata.tokenMap.tokens(nodeId);
             }
             catch (Throwable th)
             {
@@ -879,7 +752,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             if (tokens == null || tokens.isEmpty())
             {
                 logger.warn("Trying to assassinate an endpoint {} that does 
not have any tokens assigned. This should not have happened, trying to continue 
with a random token.", address);
-                tokens = 
Collections.singletonList(StorageService.instance.getTokenMetadata().partitioner.getRandomToken());
+                tokens = 
Collections.singletonList(metadata.tokenMap.partitioner().getRandomToken());
             }
 
             long expireTime = computeExpireTime();
@@ -908,13 +781,13 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param epSet   a set of endpoint from which a random endpoint is chosen.
      * @return true if the chosen endpoint is also a seed.
      */
-    private boolean sendGossip(Message<GossipDigestSyn> message, 
Set<InetAddressAndPort> epSet)
+    private EnumSet<GossipedWith> sendGossip(Message<GossipDigestSyn> message, 
Set<InetAddressAndPort> epSet)
     {
         List<InetAddressAndPort> endpoints = ImmutableList.copyOf(epSet);
 
         int size = endpoints.size();
         if (size < 1)
-            return false;
+            return EnumSet.noneOf(GossipedWith.class);
         /* Generate a random number from 0 -> size */
         int index = (size == 1) ? 0 : random.nextInt(size);
         InetAddressAndPort to = endpoints.get(index);
@@ -923,21 +796,31 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         if (firstSynSendAt == 0)
             firstSynSendAt = nanoTime();
         MessagingService.instance().send(message, to);
+        EnumSet<GossipedWith> gossipedWith = 
EnumSet.noneOf(GossipedWith.class);
 
-        boolean isSeed = seeds.contains(to);
+        if (seeds.contains(to))
+            gossipedWith.add(SEED);
+        if (ClusterMetadata.current().cmsMembers().contains(to))
+            gossipedWith.add(CMS);
         GossiperDiagnostics.sendGossipDigestSyn(this, to);
-        return isSeed;
+        return gossipedWith;
     }
 
     /* Sends a Gossip message to a live member and returns true if the 
recipient was a seed */
-    private boolean doGossipToLiveMember(Message<GossipDigestSyn> message)
+    private EnumSet<GossipedWith> 
doGossipToLiveMember(Message<GossipDigestSyn> message)
     {
         int size = liveEndpoints.size();
         if (size == 0)
-            return false;
+            return EnumSet.noneOf(GossipedWith.class);
         return sendGossip(message, liveEndpoints);
     }
 
+    enum GossipedWith
+    {
+        SEED,
+        CMS
+    }
+
     /* Sends a Gossip message to an unreachable member */
     private void maybeGossipToUnreachableMember(Message<GossipDigestSyn> 
message)
     {
@@ -957,19 +840,20 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     /* Possibly gossip to a seed for facilitating partition healing */
-    private void maybeGossipToSeed(Message<GossipDigestSyn> prod)
+    private EnumSet<GossipedWith> maybeGossipToSeed(Message<GossipDigestSyn> 
prod)
     {
         int size = seeds.size();
+        EnumSet<GossipedWith> gossipedWith = 
EnumSet.noneOf(GossipedWith.class);
         if (size > 0)
         {
             if (size == 1 && seeds.contains(getBroadcastAddressAndPort()))
             {
-                return;
+                return gossipedWith;
             }
 
             if (liveEndpoints.size() == 0)
             {
-                sendGossip(prod, seeds);
+                gossipedWith = sendGossip(prod, seeds);
             }
             else
             {
@@ -977,76 +861,35 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 double probability = seeds.size() / (double) 
(liveEndpoints.size() + unreachableEndpoints.size());
                 double randDbl = random.nextDouble();
                 if (randDbl <= probability)
-                    sendGossip(prod, seeds);
+                    gossipedWith = sendGossip(prod, seeds);
             }
         }
+        return gossipedWith;
     }
 
-    public boolean isGossipOnlyMember(InetAddressAndPort endpoint)
+    private void maybeGossipToCMS(Message<GossipDigestSyn> message)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
-        if (epState == null)
+        Set<InetAddressAndPort> cms = ClusterMetadata.current().cmsMembers();
+        if (cms.contains(getBroadcastAddressAndPort()))
+            return;
+
+        double probability = cms.size() / (double) (liveEndpoints.size() + 
unreachableEndpoints.size());
+        double randDbl = random.nextDouble();
+        if (randDbl <= probability)
         {
-            return false;
+            logger.trace("Sending GossipDigestSyn to the CMS {}", cms);
+            sendGossip(message, cms);
         }
-        return !isDeadState(epState) && 
!StorageService.instance.getTokenMetadata().isMember(endpoint);
     }
 
-    /**
-     * Check if this node can safely be started and join the ring.
-     * If the node is bootstrapping, examines gossip state for any previous 
status to decide whether
-     * it's safe to allow this node to start and bootstrap. If not 
bootstrapping, compares the host ID
-     * that the node itself has (obtained by reading from system.local or 
generated if not present)
-     * with the host ID obtained from gossip for the endpoint address (if 
any). This latter case
-     * prevents a non-bootstrapping, new node from being started with the same 
address of a
-     * previously started, but currently down predecessor.
-     *
-     * @param endpoint - the endpoint to check
-     * @param localHostUUID - the host id to check
-     * @param isBootstrapping - whether the node intends to bootstrap when 
joining
-     * @param epStates - endpoint states in the cluster
-     * @return true if it is safe to start the node, false otherwise
-     */
-    public boolean isSafeForStartup(InetAddressAndPort endpoint, UUID 
localHostUUID, boolean isBootstrapping,
-                                    Map<InetAddressAndPort, EndpointState> 
epStates)
+    public boolean isGossipOnlyMember(InetAddressAndPort endpoint)
     {
-        EndpointState epState = epStates.get(endpoint);
-        // if there's no previous state, we're good
+        EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
-            return true;
-
-        String status = getGossipStatus(epState);
-
-        if (status.equals(VersionedValue.HIBERNATE)
-            && !SystemKeyspace.bootstrapComplete())
         {
-            logger.warn("A node with the same IP in hibernate status was 
detected. Was a replacement already attempted?");
             return false;
         }
-
-        //the node was previously removed from the cluster
-        if (isDeadState(epState))
-            return true;
-
-        if (isBootstrapping)
-        {
-            // these states are not allowed to join the cluster as it would 
not be safe
-            final List<String> unsafeStatuses = new ArrayList<String>()
-            {{
-                add("");                           // failed bootstrap but we 
did start gossiping
-                add(VersionedValue.STATUS_NORMAL); // node is legit in the 
cluster or it was stopped with kill -9
-                add(VersionedValue.SHUTDOWN);      // node was shutdown
-            }};
-            return !unsafeStatuses.contains(status);
-        }
-        else
-        {
-            // if the previous UUID matches what we currently have (i.e. what 
was read from
-            // system.local at startup), then we're good to start up. 
Otherwise, something
-            // is amiss and we need to replace the previous node
-            VersionedValue previous = 
epState.getApplicationState(ApplicationState.HOST_ID);
-            return UUID.fromString(previous.value).equals(localHostUUID);
-        }
+        return !isDeadState(epState) && 
!ClusterMetadata.current().directory.allAddresses().contains(endpoint);
     }
 
     @VisibleForTesting
@@ -1072,6 +915,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             }
         }
 
+        ClusterMetadata metadata = ClusterMetadata.current();
         Set<InetAddressAndPort> eps = endpointStateMap.keySet();
         for (InetAddressAndPort endpoint : eps)
         {
@@ -1097,7 +941,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                             // to make sure that the previous read data was 
correct
                             logger.info("Race condition marking {} as a 
FatClient; ignoring", endpoint);
                             return;
-                        }                        
+                        }
                         removeEndpoint(endpoint); // will put it in 
justRemovedEndpoints to respect quarantine delay
                         evictFromMembership(endpoint); // can get rid of the 
state immediately
                     });
@@ -1106,7 +950,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 // check for dead state removal
                 long expireTime = getExpireTimeForEndpoint(endpoint);
                 if (!epState.isAlive() && (now > expireTime)
-                    && 
(!StorageService.instance.getTokenMetadata().isMember(endpoint)))
+                    && (!metadata.directory.allAddresses().contains(endpoint)))
                 {
                     if (logger.isDebugEnabled())
                     {
@@ -1195,16 +1039,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return lastProcessedMessageAt;
     }
 
-    public UUID getHostId(InetAddressAndPort endpoint)
-    {
-        return getHostId(endpoint, endpointStateMap);
-    }
-
-    public UUID getHostId(InetAddressAndPort endpoint, Map<InetAddressAndPort, 
EndpointState> epStates)
-    {
-        return 
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
-    }
-
     /**
      * The value for the provided application state for the provided endpoint 
as currently known by this Gossip instance.
      *
@@ -1271,17 +1105,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return reqdEndpointState;
     }
 
-    /**
-     * determine which endpoint started up earlier
-     */
-    public int compareEndpointStartup(InetAddressAndPort addr1, 
InetAddressAndPort addr2)
-    {
-        EndpointState ep1 = getEndpointStateForEndpoint(addr1);
-        EndpointState ep2 = getEndpointStateForEndpoint(addr2);
-        assert ep1 != null && ep2 != null;
-        return ep1.getHeartBeatState().getGeneration() - 
ep2.getHeartBeatState().getGeneration();
-    }
-
     public void notifyFailureDetector(Map<InetAddressAndPort, EndpointState> 
remoteEpStateMap)
     {
         for (Entry<InetAddressAndPort, EndpointState> entry : 
remoteEpStateMap.entrySet())
@@ -1389,7 +1212,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     * Used by {@link #markDead(InetAddressAndPort, EndpointState)} and {@link 
#addSavedEndpoint(InetAddressAndPort)}
+     * Used by {@link #markDead(InetAddressAndPort, EndpointState)}
      * to register a endpoint as dead.  This method is "silent" to avoid 
triggering listeners, diagnostics, or logs
      * on startup via addSavedEndpoint.
      */
@@ -1415,6 +1238,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         EndpointState localEpState = endpointStateMap.get(ep);
         if (!isDeadState(epState))
         {
+            // confusing log message, epState status might still be 'shutdown' 
- keeping if anyone is using it for automation
+            // we're not actually marking it as up until we get the echo 
request response in markAlive below
             if (localEpState != null)
                 logger.info("Node {} has restarted, now UP", ep);
             else
@@ -1472,23 +1297,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return SILENT_SHUTDOWN_STATES.contains(status);
     }
 
-    public boolean isAdministrativelyInactiveState(EndpointState epState)
-    {
-        String status = getGossipStatus(epState);
-        if (status.isEmpty())
-            return false;
-
-        return ADMINISTRATIVELY_INACTIVE_STATES.contains(status);
-    }
-
-    public boolean isAdministrativelyInactiveState(InetAddressAndPort endpoint)
-    {
-        EndpointState epState = getEndpointStateForEndpoint(endpoint);
-        if (epState == null)
-            return true; // if the end point cannot be found, treat as inactive
-        return isAdministrativelyInactiveState(epState);
-    }
-
     public static String getGossipStatus(EndpointState epState)
     {
         if (epState == null)
@@ -1582,11 +1390,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     public void applyStateLocally(Map<InetAddressAndPort, EndpointState> 
epStateMap)
     {
         checkProperThreadForStateMutation();
-        boolean hasMajorVersion3Nodes = hasMajorVersion3Nodes();
         for (Entry<InetAddressAndPort, EndpointState> entry : 
order(epStateMap))
         {
             InetAddressAndPort ep = entry.getKey();
-            if (ep.equals(getBroadcastAddressAndPort()) && !isInShadowRound())
+            if (ep.equals(getBroadcastAddressAndPort()))
                 continue;
             if (justRemovedEndpoints.containsKey(ep))
             {
@@ -1597,8 +1404,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
             EndpointState localEpStatePtr = endpointStateMap.get(ep);
             EndpointState remoteState = entry.getValue();
-            if (!hasMajorVersion3Nodes)
-                remoteState.removeMajorVersion3LegacyApplicationStates();
+            remoteState.removeMajorVersion3LegacyApplicationStates();
 
             /*
                 If state does not exist just add it. If it does then add it if 
the remote generation is greater.
@@ -1633,7 +1439,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                     if (remoteMaxVersion > localMaxVersion)
                     {
                         // apply states, but do not notify since there is no 
major change
-                        applyNewStates(ep, localEpStatePtr, remoteState, 
hasMajorVersion3Nodes);
+                        applyNewStates(ep, localEpStatePtr, 
remoteState.nonDerivable());
                     }
                     else if (logger.isTraceEnabled())
                             logger.trace("Ignoring remote version {} <= {} for 
{}", remoteMaxVersion, localMaxVersion, ep);
@@ -1656,7 +1462,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    private void applyNewStates(InetAddressAndPort addr, EndpointState 
localState, EndpointState remoteState, boolean hasMajorVersion3Nodes)
+    private void applyNewStates(InetAddressAndPort addr, EndpointState 
localState, EndpointState remoteState)
     {
         // don't assert here, since if the node restarts the version will go 
back to zero
         int oldVersion = localState.getHeartBeatState().getHeartBeatVersion();
@@ -1668,7 +1474,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         Set<Entry<ApplicationState, VersionedValue>> remoteStates = 
remoteState.states();
         assert remoteState.getHeartBeatState().getGeneration() == 
localState.getHeartBeatState().getGeneration();
 
-
         Set<Entry<ApplicationState, VersionedValue>> updatedStates = 
remoteStates.stream().filter(entry -> {
             // filter out the states that are already up to date (has the same 
or higher version)
             VersionedValue local = 
localState.getApplicationState(entry.getKey());
@@ -1683,10 +1488,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             }
         }
         localState.addApplicationStates(updatedStates);
-
-        // get rid of legacy fields once the cluster is not in mixed mode
-        if (!hasMajorVersion3Nodes)
-            localState.removeMajorVersion3LegacyApplicationStates();
+        localState.removeMajorVersion3LegacyApplicationStates();
 
         // need to run STATUS or STATUS_WITH_PORT first to handle BOOT_REPLACE 
correctly (else won't be a member, so TOKENS won't be processed)
         for (Entry<ApplicationState, VersionedValue> updatedEntry : 
updatedStates)
@@ -1800,13 +1602,13 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 if (null != hostId)
                 {
                     state.addApplicationState(ApplicationState.HOST_ID,
-                                                 
StorageService.instance.valueFactory.hostId(hostId));
+                                              
StorageService.instance.valueFactory.hostId(hostId));
                 }
                 Set<Token> tokens = SystemKeyspace.loadTokens().get(endpoint);
                 if (null != tokens && !tokens.isEmpty())
                 {
                     state.addApplicationState(ApplicationState.TOKENS,
-                                                 
StorageService.instance.valueFactory.tokens(tokens));
+                                              
StorageService.instance.valueFactory.tokens(tokens));
                 }
             }
             map.put(endpoint, state);
@@ -1908,97 +1710,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                                                               
TimeUnit.MILLISECONDS);
     }
 
-    public synchronized Map<InetAddressAndPort, EndpointState> doShadowRound()
-    {
-        return doShadowRound(Collections.EMPTY_SET);
-    }
-
-    /**
-     * Do a single 'shadow' round of gossip by retrieving endpoint states that 
will be stored exclusively in the
-     * map return value, instead of endpointStateMap.
-     *
-     * Used when preparing to join the ring:
-     * <ul>
-     *     <li>when replacing a node, to get and assume its tokens</li>
-     *     <li>when joining, to check that the local host id matches any 
previous id for the endpoint address</li>
-     * </ul>
-     *
-     * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
-     * again by calling {@link 
Gossiper#maybeFinishShadowRound(InetAddressAndPort, boolean, Map)}. This will 
update
-     * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
-     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
-     * the same time.
-     *
-     * @param peers Additional peers to try gossiping with.
-     * @return endpoint states gathered during shadow round or empty map
-     */
-    public synchronized Map<InetAddressAndPort, EndpointState> 
doShadowRound(Set<InetAddressAndPort> peers)
-    {
-        buildSeedsList();
-        // it may be that the local address is the only entry in the seed + 
peers
-        // list in which case, attempting a shadow round is pointless
-        if (seeds.isEmpty() && peers.isEmpty())
-            return endpointShadowStateMap;
-
-        boolean isSeed = 
DatabaseDescriptor.getSeeds().contains(getBroadcastAddressAndPort());
-        // We double RING_DELAY if we're not a seed to increase chance of 
successful startup during a full cluster bounce,
-        // giving the seeds a chance to startup before we fail the shadow round
-        int shadowRoundDelay = isSeed ? StorageService.RING_DELAY_MILLIS : 
StorageService.RING_DELAY_MILLIS * 2;
-        seedsInShadowRound.clear();
-        endpointShadowStateMap.clear();
-        // send a completely empty syn
-        List<GossipDigest> gDigests = new ArrayList<>();
-        GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(getClusterName(), getPartitionerName(), gDigests);
-        Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, 
digestSynMessage);
-
-        inShadowRound = true;
-        boolean includePeers = false;
-        int slept = 0;
-        try
-        {
-            while (true)
-            {
-                if (slept % 5000 == 0)
-                { // CASSANDRA-8072, retry at the beginning and every 5 seconds
-                    logger.trace("Sending shadow round GOSSIP DIGEST SYN to 
seeds {}", seeds);
-
-                    for (InetAddressAndPort seed : seeds)
-                        MessagingService.instance().send(message, seed);
-
-                    // Send to any peers we already know about, but only if a 
seed didn't respond.
-                    if (includePeers)
-                    {
-                        logger.trace("Sending shadow round GOSSIP DIGEST SYN 
to known peers {}", peers);
-                        for (InetAddressAndPort peer : peers)
-                            MessagingService.instance().send(message, peer);
-                    }
-                    includePeers = true;
-                }
-
-                Thread.sleep(1000);
-                if (!inShadowRound)
-                    break;
-
-                slept += 1000;
-                if (slept > shadowRoundDelay)
-                {
-                    // if we got here no peers could be gossiped to. If we're 
a seed that's OK, but otherwise we stop. See CASSANDRA-13851
-                    if (!isSeed)
-                        throw new RuntimeException("Unable to gossip with any 
peers");
-
-                    inShadowRound = false;
-                    break;
-                }
-            }
-        }
-        catch (InterruptedException e)
-        {
-            throw new UncheckedInterruptedException(e);
-        }
-
-        return ImmutableMap.copyOf(endpointShadowStateMap);
-    }
-
     @VisibleForTesting
     void buildSeedsList()
     {
@@ -2086,45 +1797,14 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         epstate.getHeartBeatState().forceNewerGenerationUnsafe();
     }
 
-
-    /**
-     * Add an endpoint we knew about previously, but whose state is unknown
-     */
-    public void addSavedEndpoint(InetAddressAndPort ep)
-    {
-        checkProperThreadForStateMutation();
-        if (ep.equals(getBroadcastAddressAndPort()))
-        {
-            logger.debug("Attempt to add self as saved endpoint");
-            return;
-        }
-
-        //preserve any previously known, in-memory data about the endpoint 
(such as DC, RACK, and so on)
-        EndpointState epState = endpointStateMap.get(ep);
-        if (epState != null)
-        {
-            logger.debug("not replacing a previous epState for {}, but reusing 
it: {}", ep, epState);
-            epState.setHeartBeatState(HeartBeatState.empty());
-        }
-        else
-        {
-            epState = new EndpointState(HeartBeatState.empty());
-            logger.info("Adding {} as there was no previous epState; new state 
is {}", ep, epState);
-        }
-
-        epState.markDead();
-        endpointStateMap.put(ep, epState);
-        silentlyMarkDead(ep, epState);
-        if (logger.isTraceEnabled())
-            logger.trace("Adding saved endpoint {} {}", ep, 
epState.getHeartBeatState().getGeneration());
-    }
-
     private void addLocalApplicationStateInternal(ApplicationState state, 
VersionedValue value)
     {
         assert taskLock.isHeldByCurrentThread();
         InetAddressAndPort epAddr = getBroadcastAddressAndPort();
         EndpointState epState = endpointStateMap.get(epAddr);
-        assert epState != null : "Can't find endpoint state for " + epAddr;
+        // todo; this can be null during startup log replay when bootstrapping
+        //  - we would have no state for ourselves
+        if (epState == null) return;
         // Fire "before change" notifications:
         doBeforeChangeNotifications(epAddr, epState, state, value);
         // Notifications may have taken some time, so preventively raise the 
version
@@ -2183,78 +1863,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && 
(!scheduledGossipTask.isCancelled());
     }
 
-    public boolean sufficientForStartupSafetyCheck(Map<InetAddressAndPort, 
EndpointState> epStateMap)
-    {
-        // it is possible for a previously queued ack to be sent to us when we 
come back up in shadow
-        EndpointState localState = 
epStateMap.get(FBUtilities.getBroadcastAddressAndPort());
-        // return false if response doesn't contain state necessary for safety 
check
-        return localState == null || isDeadState(localState) || 
localState.containsApplicationState(ApplicationState.HOST_ID);
-    }
-
-    protected void maybeFinishShadowRound(InetAddressAndPort respondent, 
boolean isInShadowRound, Map<InetAddressAndPort, EndpointState> epStateMap)
-    {
-        if (inShadowRound)
-        {
-            if (!isInShadowRound)
-            {
-                if (!sufficientForStartupSafetyCheck(epStateMap))
-                {
-                    logger.debug("Not exiting shadow round because received 
ACK with insufficient states {} -> {}",
-                                 FBUtilities.getBroadcastAddressAndPort(), 
epStateMap.get(FBUtilities.getBroadcastAddressAndPort()));
-                    return;
-                }
-
-                if (!seeds.contains(respondent))
-                    logger.warn("Received an ack from {}, who isn't a seed. 
Ensure your seed list includes a live node. Exiting shadow round",
-                                respondent);
-                logger.debug("Received a regular ack from {}, can now exit 
shadow round", respondent);
-                // respondent sent back a full ack, so we can exit our shadow 
round
-                endpointShadowStateMap.putAll(epStateMap);
-                inShadowRound = false;
-                seedsInShadowRound.clear();
-            }
-            else
-            {
-                // respondent indicates it too is in a shadow round, if all 
seeds
-                // are in this state then we can exit our shadow round. 
Otherwise,
-                // we keep retrying the SR until one responds with a full ACK 
or
-                // we learn that all seeds are in SR.
-                logger.debug("Received an ack from {} indicating it is also in 
shadow round", respondent);
-                seedsInShadowRound.add(respondent);
-                if (seedsInShadowRound.containsAll(seeds))
-                {
-                    logger.debug("All seeds are in a shadow round, clearing 
this node to exit its own");
-                    inShadowRound = false;
-                    seedsInShadowRound.clear();
-                }
-            }
-        }
-    }
-
-    public boolean isInShadowRound()
-    {
-        return inShadowRound;
-    }
-
-    /**
-     * Creates a new dead {@link EndpointState} that is {@link 
EndpointState#isEmptyWithoutStatus() empty}.  This is used during
-     * host replacement for edge cases where the seed notified that the 
endpoint was empty, so need to add such state
-     * into gossip explicitly (as empty endpoints are not gossiped outside of 
the shadow round).
-     *
-     * see CASSANDRA-16213
-     */
-    public void initializeUnreachableNodeUnsafe(InetAddressAndPort addr)
-    {
-        EndpointState state = new EndpointState(HeartBeatState.empty());
-        state.markDead();
-        EndpointState oldState = endpointStateMap.putIfAbsent(addr, state);
-        if (null != oldState)
-        {
-            throw new RuntimeException("Attempted to initialize endpoint state 
for unreachable node, " +
-                                       "but found existing endpoint state for 
it.");
-        }
-    }
-
     @VisibleForTesting
     public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int 
generationNbr)
     {
@@ -2346,6 +1954,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return state != null ? state.getSchemaVersion() : null;
     }
 
+    // TODO: (TM/alexp): we do not need to wait for gossip to settle anymore, 
since main keys are now coming from TM
     public static void waitToSettle()
     {
         int forceAfter = GOSSIPER_SKIP_WAITING_TO_SETTLE.getInt();
@@ -2369,7 +1978,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             totalPolls++;
             if (currentSize == epSize)
             {
-                logger.debug("Gossip looks settled.");
+                logger.debug("Gossip looks settled. {}", 
Gossiper.instance.endpointStateMap);
                 numOkay++;
             }
             else
@@ -2398,6 +2007,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * @param unit TimeUnit of maxWait
      * @return true if agreement was reached, false if not
      */
+    // TODO: (TM/alexp): we do not need to wait for schema convergence for the 
purpose of view building;
+    // we will rely on different mechanisms for propagating mutations correctly
     public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, 
BooleanSupplier abortCondition)
     {
         int waited = 0;
@@ -2419,39 +2030,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
     }
 
-    /**
-     * Returns {@code false} only if the information about the version of each 
node in the cluster is available and
-     * ALL the nodes are on 4.0+ (regardless of the patch version).
-     */
-    public boolean hasMajorVersion3Nodes()
-    {
-        return isUpgradingFromVersionLowerThan(CassandraVersion.CASSANDRA_4_0) 
|| // this is quite obvious
-               // however if we discovered only nodes at current version so 
far (in particular only this node),
-               // but still there are nodes with unknown version, we also want 
to report that the cluster may have nodes at 3.x
-               upgradeInProgressPossible && 
!isUpgradingFromVersionLowerThan(SystemKeyspace.CURRENT_VERSION.familyLowerBound.get());
-    }
-
-    /**
-     * Returns {@code true} if there are nodes on version lower than the 
provided version
-     */
-    public boolean isUpgradingFromVersionLowerThan(CassandraVersion 
referenceVersion)
-    {
-        return isUpgradingFromVersionLowerThanC17653(referenceVersion).left;
-    }
-
-    /* TODO: Aux method for debug purposes on fixing C17653. To be removed*/
-    @VisibleForTesting
-    public Pair<Boolean, CassandraVersion> 
isUpgradingFromVersionLowerThanC17653(CassandraVersion referenceVersion)
-    {
-        CassandraVersion v = upgradeFromVersionMemoized.get();
-        if (CassandraVersion.NULL_VERSION.equals(v) && scheduledGossipTask == 
null)
-            return Pair.create(false, v);
-
-        boolean res = v != null && v.compareTo(referenceVersion) < 0;
-
-        return Pair.create(res, v);
-    }
-
     private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
     {
         UUID expectedVersion = null;
@@ -2563,27 +2141,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         firstSynSendAt = 1;
     }
 
-    public Collection<InetAddressAndPort> unsafeClearRemoteState()
-    {
-        List<InetAddressAndPort> removed = new ArrayList<>();
-        for (InetAddressAndPort ep : endpointStateMap.keySet())
-        {
-            if (ep.equals(getBroadcastAddressAndPort()))
-                continue;
-
-            for (IEndpointStateChangeSubscriber subscriber : subscribers)
-                subscriber.onRemove(ep);
-
-            removed.add(ep);
-        }
-        
this.endpointStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
-        
this.endpointShadowStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
-        
this.expireTimeEndpointMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
-        
this.justRemovedEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
-        
this.unreachableEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
-        return removed;
-    }
-
     public void unsafeGossipWith(InetAddressAndPort ep)
     {
         /* Update the local heartbeat counter. */
@@ -2623,4 +2180,49 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         Message<GossipDigestAck2> message = 
Message.out(Verb.GOSSIP_DIGEST_ACK2, digestAck2Message);
         MessagingService.instance().send(message, ep);
     }
+
+    public void unsafeUpdateEpStates(InetAddressAndPort endpoint, 
EndpointState epstate)
+    {
+        taskLock.lock();
+        try
+        {
+            checkProperThreadForStateMutation();
+            assert !endpoint.equals(getBroadcastAddressAndPort()) || 
epstate.getHeartBeatState().getGeneration() > 0 :
+                   "We should not update epstates with generation = 0 for the 
local host";
+            EndpointState old = endpointStateMap.get(endpoint);
+            if (old == null)
+                endpointStateMap.put(endpoint, epstate);
+            else
+                old.addApplicationStates(epstate.states());
+
+            if (!getBroadcastAddressAndPort().equals(endpoint))
+            {
+                // don't consider it a major state change if the generation is 
0 - this means we have only added it locally for a remote node
+                if (epstate.getHeartBeatState().getGeneration() > 0 &&
+                    (old == null || old.getHeartBeatState().getGeneration() < 
epstate.getHeartBeatState().getGeneration()))
+                    handleMajorStateChange(endpoint, epstate);
+            }
+        }
+        finally
+        {
+            taskLock.unlock();
+        }
+    }
+
+    public void triggerRoundWithCMS()
+    {
+        ClusterMetadata metadata = ClusterMetadata.current();
+        Set<InetAddressAndPort> cms = metadata.cmsMembers();
+        if (!cms.contains(getBroadcastAddressAndPort()))
+        {
+            logger.debug("Triggering gossip round with CMS {}", 
metadata.epoch);
+            final List<GossipDigest> gDigests = new ArrayList<>();
+            Gossiper.instance.makeRandomGossipDigest(gDigests);
+            GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(getClusterName(),
+                                                                   
getPartitionerName(),
+                                                                   gDigests);
+            Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, 
digestSynMessage);
+            sendGossip(message, cms);
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/gms/GossiperEvent.java 
b/src/java/org/apache/cassandra/gms/GossiperEvent.java
index 71fee7c991..44d4ad6247 100644
--- a/src/java/org/apache/cassandra/gms/GossiperEvent.java
+++ b/src/java/org/apache/cassandra/gms/GossiperEvent.java
@@ -77,7 +77,7 @@ public final class GossiperEvent extends DiagnosticEvent
         this.localState = localState;
 
         this.endpointStateMap = gossiper.getEndpointStateMap();
-        this.inShadowRound = gossiper.isInShadowRound();
+        this.inShadowRound = false; // todo; gossiper.isInShadowRound();
         this.justRemovedEndpoints = gossiper.getJustRemovedEndpoints();
         this.lastProcessedMessageAt = gossiper.getLastProcessedMessageAt();
         this.liveEndpoints = gossiper.getLiveMembers();
diff --git a/src/java/org/apache/cassandra/gms/NewGossiper.java 
b/src/java/org/apache/cassandra/gms/NewGossiper.java
new file mode 100644
index 0000000000..9fb889b286
--- /dev/null
+++ b/src/java/org/apache/cassandra/gms/NewGossiper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Promise;
+
+import static org.apache.cassandra.config.DatabaseDescriptor.getClusterName;
+import static 
org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName;
+import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
+
+public class NewGossiper
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(NewGossiper.class);
+    public static final NewGossiper instance = new NewGossiper();
+
+    private volatile ShadowRoundHandler handler;
+
+    public Map<InetAddressAndPort, EndpointState> doShadowRound()
+    {
+        Set<InetAddressAndPort> peers = new 
HashSet<>(SystemKeyspace.loadHostIds().keySet());
+        if (peers.isEmpty())
+            peers.addAll(DatabaseDescriptor.getSeeds());
+
+        ShadowRoundHandler shadowRoundHandler = new ShadowRoundHandler(peers);
+        handler = shadowRoundHandler;
+
+        try
+        {
+            return shadowRoundHandler.doShadowRound().get(30, 
TimeUnit.SECONDS);
+        }
+        catch (InterruptedException | ExecutionException | TimeoutException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    public boolean isInShadowRound()
+    {
+        ShadowRoundHandler srh = handler;
+        return srh != null && !srh.isDone();
+    }
+
+    void onAck( Map<InetAddressAndPort, EndpointState> epStateMap)
+    {
+        ShadowRoundHandler srh = handler;
+        if (srh != null && !srh.isDone())
+            srh.onAck(epStateMap);
+    }
+
+    public static class ShadowRoundHandler
+    {
+        private volatile boolean isDone = false;
+        private final Set<InetAddressAndPort> peers;
+        private final Accumulator<Map<InetAddressAndPort, EndpointState>> 
responses;
+        private final int requiredResponses;
+        private final Promise<Map<InetAddressAndPort, EndpointState>> promise 
= new AsyncPromise<>();
+
+        public ShadowRoundHandler(Set<InetAddressAndPort> peers)
+        {
+            this.peers = peers;
+            requiredResponses = Math.max(peers.size() / 10, 1);
+            responses = new Accumulator<>(requiredResponses);
+        }
+
+        public boolean isDone()
+        {
+            return isDone;
+        }
+
+        public Promise<Map<InetAddressAndPort, EndpointState>> doShadowRound()
+        {
+            // send a completely empty syn
+            GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(getClusterName(), getPartitionerName(), new ArrayList<>());
+            Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, 
digestSynMessage);
+
+            logger.info("Sending shadow round GOSSIP DIGEST SYN to known peers 
{}", peers);
+            for (InetAddressAndPort peer : peers)
+            {
+                if (!peer.equals(getBroadcastAddressAndPort()))
+                    MessagingService.instance().send(message, peer);
+            }
+            return promise;
+        }
+
+        public void onAck(Map<InetAddressAndPort, EndpointState> epStateMap)
+        {
+            if (!isDone)
+            {
+                if (!epStateMap.isEmpty())
+                    responses.add(epStateMap);
+
+                logger.debug("Received {} responses. {} required.", 
responses.size(), requiredResponses);
+                if (responses.size() >= requiredResponses)
+                {
+                    isDone = true;
+                    promise.setSuccess(merge(responses.snapshot()));
+                }
+            }
+        }
+
+        private Map<InetAddressAndPort, EndpointState> 
merge(Collection<Map<InetAddressAndPort, EndpointState>> snapshot)
+        {
+            return snapshot.iterator().next(); // todo; actually merge/verify 
the responses
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d76f3011f4..f6c02258de 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -22,6 +22,7 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
@@ -69,20 +70,31 @@ public class VersionedValue implements 
Comparable<VersionedValue>
     public final static String DELIMITER_STR = new String(new char[]{ 
DELIMITER });
 
     // values for ApplicationState.STATUS
+    @Deprecated
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
+    @Deprecated
     public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE";
+    @Deprecated
     public final static String STATUS_NORMAL = "NORMAL";
+    @Deprecated
     public final static String STATUS_LEAVING = "LEAVING";
+    @Deprecated
     public final static String STATUS_LEFT = "LEFT";
+    @Deprecated
     public final static String STATUS_MOVING = "MOVING";
 
+    @Deprecated
     public final static String REMOVING_TOKEN = "removing";
+    @Deprecated
     public final static String REMOVED_TOKEN = "removed";
 
+    @Deprecated
     public final static String HIBERNATE = "hibernate";
+    @Deprecated
     public final static String SHUTDOWN = "shutdown";
 
     // values for ApplicationState.REMOVAL_COORDINATOR
+    @Deprecated
     public final static String REMOVAL_COORDINATOR = "REMOVER";
 
     public static Set<String> BOOTSTRAPPING_STATUS = 
ImmutableSet.of(STATUS_BOOTSTRAPPING, STATUS_BOOTSTRAPPING_REPLACE);
@@ -137,10 +149,17 @@ public class VersionedValue implements 
Comparable<VersionedValue>
     public static class VersionedValueFactory
     {
         final IPartitioner partitioner;
+        private final Supplier<Integer> versionSupplier;
 
         public VersionedValueFactory(IPartitioner partitioner)
+        {
+            this(partitioner, VersionGenerator::getNextVersion);
+        }
+
+        public VersionedValueFactory(IPartitioner partitioner, 
Supplier<Integer> versionSupplier)
         {
             this.partitioner = partitioner;
+            this.versionSupplier = versionSupplier;
         }
         
         public VersionedValue cloneWithHigherVersion(VersionedValue value)
@@ -151,24 +170,29 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         @Deprecated
         public VersionedValue bootReplacing(InetAddress oldNode)
         {
-            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.getHostAddress()));
+            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.getHostAddress()), versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue bootReplacingWithPort(InetAddressAndPort oldNode)
         {
-            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.getHostAddressAndPort()));
+            return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, 
oldNode.getHostAddressAndPort()), versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {
             return new 
VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
-                                                    makeTokenString(tokens)));
+                                                    makeTokenString(tokens)),
+                                      versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue normal(Collection<Token> tokens)
         {
             return new 
VersionedValue(versionString(VersionedValue.STATUS_NORMAL,
-                                                    makeTokenString(tokens)));
+                                                    makeTokenString(tokens)),
+                                      versionSupplier.get());
         }
 
         private String makeTokenString(Collection<Token> tokens)
@@ -178,7 +202,8 @@ public class VersionedValue implements 
Comparable<VersionedValue>
 
         public VersionedValue load(double load)
         {
-            return new VersionedValue(String.valueOf(load));
+            return new VersionedValue(String.valueOf(load),
+                                      versionSupplier.get());
         }
 
         public VersionedValue diskUsage(String state)
@@ -188,22 +213,27 @@ public class VersionedValue implements 
Comparable<VersionedValue>
 
         public VersionedValue schema(UUID newVersion)
         {
-            return new VersionedValue(newVersion.toString());
+            return new VersionedValue(newVersion.toString(), 
versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue leaving(Collection<Token> tokens)
         {
             return new 
VersionedValue(versionString(VersionedValue.STATUS_LEAVING,
-                                                    makeTokenString(tokens)));
+                                                    makeTokenString(tokens)),
+                                      versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue left(Collection<Token> tokens, long expireTime)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_LEFT,
                                                     makeTokenString(tokens),
-                                                    
Long.toString(expireTime)));
+                                                    Long.toString(expireTime)),
+                                      versionSupplier.get());
         }
 
+        @Deprecated
         @VisibleForTesting
         public VersionedValue left(Collection<Token> tokens, long expireTime, 
int generation)
         {
@@ -212,16 +242,20 @@ public class VersionedValue implements 
Comparable<VersionedValue>
                                                     
Long.toString(expireTime)), generation);
         }
 
+        @Deprecated
         public VersionedValue moving(Token token)
         {
-            return new VersionedValue(VersionedValue.STATUS_MOVING + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.STATUS_MOVING + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token),
+                                      versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue hostId(UUID hostId)
         {
-            return new VersionedValue(hostId.toString());
+            return new VersionedValue(hostId.toString(), 
versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue tokens(Collection<Token> tokens)
         {
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -234,101 +268,105 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             {
                 throw new RuntimeException(e);
             }
-            return new VersionedValue(new String(bos.toByteArray(), 
ISO_8859_1));
+            return new VersionedValue(new String(bos.toByteArray(), 
ISO_8859_1), versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue removingNonlocal(UUID hostId)
         {
-            return new 
VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString()));
+            return new 
VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString()), 
versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue removedNonlocal(UUID hostId, long expireTime)
         {
-            return new 
VersionedValue(versionString(VersionedValue.REMOVED_TOKEN, hostId.toString(), 
Long.toString(expireTime)));
+            return new 
VersionedValue(versionString(VersionedValue.REMOVED_TOKEN, hostId.toString(), 
Long.toString(expireTime)), versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue removalCoordinator(UUID hostId)
         {
-            return new 
VersionedValue(versionString(VersionedValue.REMOVAL_COORDINATOR, 
hostId.toString()));
+            return new 
VersionedValue(versionString(VersionedValue.REMOVAL_COORDINATOR, 
hostId.toString()), versionSupplier.get());
         }
 
+        @Deprecated
         public VersionedValue hibernate(boolean value)
         {
-            return new VersionedValue(VersionedValue.HIBERNATE + 
VersionedValue.DELIMITER + value);
+            return new VersionedValue(VersionedValue.HIBERNATE + 
VersionedValue.DELIMITER + value, versionSupplier.get());
         }
 
         public VersionedValue rpcReady(boolean value)
         {
-            return new VersionedValue(String.valueOf(value));
+            return new VersionedValue(String.valueOf(value), 
versionSupplier.get());
         }
 
         public VersionedValue shutdown(boolean value)
         {
-            return new VersionedValue(VersionedValue.SHUTDOWN + 
VersionedValue.DELIMITER + value);
+            return new VersionedValue(VersionedValue.SHUTDOWN + 
VersionedValue.DELIMITER + value, versionSupplier.get());
         }
 
         public VersionedValue datacenter(String dcId)
         {
-            return new VersionedValue(dcId);
+            return new VersionedValue(dcId, versionSupplier.get());
         }
 
         public VersionedValue rack(String rackId)
         {
-            return new VersionedValue(rackId);
+            return new VersionedValue(rackId, versionSupplier.get());
         }
 
         public VersionedValue rpcaddress(InetAddress endpoint)
         {
-            return new VersionedValue(endpoint.getHostAddress());
+            return new VersionedValue(endpoint.getHostAddress(), 
versionSupplier.get());
         }
 
         public VersionedValue nativeaddressAndPort(InetAddressAndPort address)
         {
-            return new VersionedValue(address.getHostAddressAndPort());
+            return new VersionedValue(address.getHostAddressAndPort(), 
versionSupplier.get());
         }
 
         public VersionedValue releaseVersion()
         {
-            return new VersionedValue(FBUtilities.getReleaseVersionString());
+            return new VersionedValue(FBUtilities.getReleaseVersionString(), 
versionSupplier.get());
         }
 
         @VisibleForTesting
         public VersionedValue releaseVersion(String version)
         {
-            return new VersionedValue(version);
+            return new VersionedValue(version, versionSupplier.get());
         }
 
         @VisibleForTesting
         public VersionedValue networkVersion(int version)
         {
-            return new VersionedValue(String.valueOf(version));
+            return new VersionedValue(String.valueOf(version), 
versionSupplier.get());
         }
 
         public VersionedValue networkVersion()
         {
-            return new 
VersionedValue(String.valueOf(MessagingService.current_version));
+            return new 
VersionedValue(String.valueOf(MessagingService.current_version), 
versionSupplier.get());
         }
 
         public VersionedValue internalIP(InetAddress private_ip)
         {
-            return new VersionedValue(private_ip.getHostAddress());
+            return new VersionedValue(private_ip.getHostAddress(), 
versionSupplier.get());
         }
 
         public VersionedValue internalAddressAndPort(InetAddressAndPort 
private_ip_and_port)
         {
-            return new 
VersionedValue(private_ip_and_port.getHostAddressAndPort());
+            return new 
VersionedValue(private_ip_and_port.getHostAddressAndPort(), 
versionSupplier.get());
         }
 
         public VersionedValue severity(double value)
         {
-            return new VersionedValue(String.valueOf(value));
+            return new VersionedValue(String.valueOf(value), 
versionSupplier.get());
         }
 
         public VersionedValue sstableVersions(Set<VersionAndType> versions)
         {
             return new VersionedValue(versions.stream()
                                               .map(VersionAndType::toString)
-                                              
.collect(Collectors.joining(",")));
+                                              
.collect(Collectors.joining(",")), versionSupplier.get());
         }
     }
 
diff --git 
a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java 
b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
index dc40093d4d..563dedf935 100644
--- a/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java
@@ -225,10 +225,6 @@ public final class SystemDistributedKeyspace
 
     public static void startRepairs(TimeUUID id, TimeUUID parent_id, String 
keyspaceName, String[] cfnames, CommonRange commonRange)
     {
-        // Don't record repair history if an upgrade is in progress as version 
3 nodes generates errors
-        // due to schema differences
-        boolean includeNewColumns = !Gossiper.instance.hasMajorVersion3Nodes();
-
         InetAddressAndPort coordinator = 
FBUtilities.getBroadcastAddressAndPort();
         Set<String> participants = Sets.newHashSet();
         Set<String> participants_v2 = Sets.newHashSet();
@@ -250,35 +246,18 @@ public final class SystemDistributedKeyspace
         {
             for (Range<Token> range : commonRange.ranges)
             {
-                String fmtQry;
-                if (includeNewColumns)
-                {
-                    fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
-                                    keyspaceName,
-                                    cfname,
-                                    id.toString(),
-                                    parent_id.toString(),
-                                    range.left.toString(),
-                                    range.right.toString(),
-                                    coordinator.getHostAddress(false),
-                                    coordinator.getPort(),
-                                    Joiner.on("', '").join(participants),
-                                    Joiner.on("', '").join(participants_v2),
-                                    RepairState.STARTED.toString());
-                }
-                else
-                {
-                    fmtQry = format(queryWithoutNewColumns, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
-                                    keyspaceName,
-                                    cfname,
-                                    id.toString(),
-                                    parent_id.toString(),
-                                    range.left.toString(),
-                                    range.right.toString(),
-                                    coordinator.getHostAddress(false),
-                                    Joiner.on("', '").join(participants),
-                                    RepairState.STARTED.toString());
-                }
+                String fmtQry = format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
+                                       keyspaceName,
+                                       cfname,
+                                       id.toString(),
+                                       parent_id.toString(),
+                                       range.left.toString(),
+                                       range.right.toString(),
+                                       coordinator.getHostAddress(false),
+                                       coordinator.getPort(),
+                                       Joiner.on("', '").join(participants),
+                                       Joiner.on("', '").join(participants_v2),
+                                       RepairState.STARTED.toString());
                 processSilent(fmtQry);
             }
         }
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 7ff6b87c59..2f4270ace6 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -18,23 +18,67 @@
 
 package org.apache.cassandra.tcm.compatibility;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.gms.TokenSerializer;
+import org.apache.cassandra.gms.VersionGenerator;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Period;
+import org.apache.cassandra.tcm.extensions.ExtensionKey;
+import org.apache.cassandra.tcm.extensions.ExtensionValue;
 import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.TokenMap;
+import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.utils.CassandraVersion;
+
+import static org.apache.cassandra.gms.ApplicationState.DC;
+import static org.apache.cassandra.gms.ApplicationState.HOST_ID;
+import static 
org.apache.cassandra.gms.ApplicationState.INTERNAL_ADDRESS_AND_PORT;
+import static org.apache.cassandra.gms.ApplicationState.INTERNAL_IP;
+import static 
org.apache.cassandra.gms.ApplicationState.NATIVE_ADDRESS_AND_PORT;
+import static org.apache.cassandra.gms.ApplicationState.RACK;
+import static org.apache.cassandra.gms.ApplicationState.RPC_ADDRESS;
+import static org.apache.cassandra.gms.ApplicationState.TOKENS;
+import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue;
+import static org.apache.cassandra.locator.InetAddressAndPort.getByName;
+import static 
org.apache.cassandra.locator.InetAddressAndPort.getByNameOverrideDefaults;
+import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
 public class GossipHelper
 {
@@ -44,6 +88,209 @@ public class GossipHelper
                                                    
StorageService.instance.valueFactory.schema(version));
     }
 
+    public static void removeFromGossip(InetAddressAndPort addr)
+    {
+        Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.removeEndpoint(addr));
+    }
+
+    /**
+     * Basic idea is that we can't ever bump the generation or version for a 
remote node
+     *
+     * If the remote node is not yet known, set generation and version to 0 to 
make sure that we don't overwrite
+     * any state generated by the remote node itself
+     *
+     * If the remote node is known, keep the remote generation and version and 
just update the versioned value in
+     * place, this makes sure that if the remote node changed, those values 
will override anything we have here.
+     */
+    public static void mergeNodeToGossip(NodeId nodeId, ClusterMetadata 
metadata)
+    {
+        mergeNodeToGossip(nodeId, metadata, metadata.tokenMap.tokens(nodeId));
+    }
+
+    public static void mergeNodeToGossip(NodeId nodeId, ClusterMetadata 
metadata, Collection<Token> tokens)
+    {
+        boolean isLocal = nodeId.equals(metadata.myNodeId());
+        IPartitioner partitioner = metadata.tokenMap.partitioner();
+        NodeAddresses addresses = metadata.directory.getNodeAddresses(nodeId);
+        Location location = metadata.directory.location(nodeId);
+        InetAddressAndPort endpoint = addresses.broadcastAddress;
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner,
+                                                                               
                      isLocal ? VersionGenerator::getNextVersion : () -> 0);
+        Gossiper.runInGossipStageBlocking(() -> {
+            EndpointState epstate = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+            if (epstate == null)
+                epstate = new EndpointState(HeartBeatState.empty());
+            Map<ApplicationState, VersionedValue> newStates = new 
EnumMap<>(ApplicationState.class);
+            for (ApplicationState appState : ApplicationState.values())
+            {
+                VersionedValue value = epstate.getApplicationState(appState);
+                VersionedValue newValue = null;
+                switch (appState)
+                {
+                    case DC:
+                        newValue = 
valueFactory.datacenter(location.datacenter);
+                        break;
+                    case RACK:
+                        newValue = valueFactory.rack(location.rack);
+                        break;
+                    case RELEASE_VERSION:
+                        newValue = 
valueFactory.releaseVersion(metadata.directory.version(nodeId).cassandraVersion.toString());
+                        break;
+                    case RPC_ADDRESS:
+                        newValue = 
valueFactory.rpcaddress(endpoint.getAddress());
+                        break;
+                    case HOST_ID:
+                        if 
(getBroadcastAddressAndPort().equals(addresses.broadcastAddress))
+                            SystemKeyspace.setLocalHostId(nodeId.uuid);
+                        newValue = valueFactory.hostId(nodeId.uuid);
+                        break;
+                    case TOKENS:
+                        if (tokens != null)
+                            newValue = valueFactory.tokens(tokens);
+                        break;
+                    case INTERNAL_ADDRESS_AND_PORT:
+                        newValue = 
valueFactory.internalAddressAndPort(addresses.localAddress);
+                        break;
+                    case NATIVE_ADDRESS_AND_PORT:
+                        newValue = 
valueFactory.nativeaddressAndPort(addresses.nativeAddress);
+                        break;
+                    case STATUS:
+                        // only publish/add STATUS if there are non-upgraded 
hosts
+                        if 
(metadata.directory.versions.values().stream().anyMatch(v -> !v.isUpgraded()) 
&& tokens != null && !tokens.isEmpty())
+                            newValue = 
nodeStateToStatus(metadata.directory.peerState(nodeId), tokens, valueFactory);
+                        break;
+                    case STATUS_WITH_PORT:
+                        if (tokens != null && !tokens.isEmpty())
+                            newValue = 
nodeStateToStatus(metadata.directory.peerState(nodeId), tokens, valueFactory);
+                        break;
+                    default:
+                        newValue = value;
+                }
+                if (newValue != null)
+                {
+                    // note that version needs to be > -1 here, otherwise 
Gossiper#sendAll on generation change doesn't send it
+                    if (!isLocal)
+                        newValue = unsafeMakeVersionedValue(newValue.value, 
value == null ? 0 : value.version);
+                    newStates.put(appState, newValue);
+                }
+            }
+            HeartBeatState heartBeatState = new 
HeartBeatState(epstate.getHeartBeatState().getGeneration(), isLocal ? 
VersionGenerator.getNextVersion() : 0);
+            Gossiper.instance.unsafeUpdateEpStates(endpoint, new 
EndpointState(heartBeatState, newStates));
+        });
+    }
+
+    private static VersionedValue nodeStateToStatus(NodeState nodeState, 
Collection<Token> tokens, VersionedValue.VersionedValueFactory valueFactory)
+    {
+        VersionedValue status = null;
+        switch (nodeState)
+        {
+            case JOINED:
+                status = valueFactory.normal(tokens);
+                break;
+            case LEFT:
+                status = valueFactory.left(tokens, 
Gossiper.computeExpireTime());
+                break;
+            case BOOTSTRAPPING:
+                status = valueFactory.bootstrapping(tokens);
+                break;
+            case LEAVING:
+                status = valueFactory.leaving(tokens);
+                break;
+            case MOVING:
+                status = valueFactory.moving(tokens.iterator().next()); //todo
+                break;
+            case REGISTERED:
+                break;
+            default:
+                throw new RuntimeException("Bad NodeState " + nodeState);
+        }
+        return status;
+    }
+
+    public static void mergeAllNodeStatesToGossip(ClusterMetadata metadata)
+    {
+        Gossiper.runInGossipStageBlocking(() -> {
+            for (Map.Entry<NodeId, NodeState> entry : 
metadata.directory.states.entrySet())
+                mergeNodeToGossip(entry.getKey(), metadata);
+        });
+    }
+
+    private static Collection<Token> getTokensIn(IPartitioner partitioner, 
EndpointState epState)
+    {
+        try
+        {
+            if (epState == null)
+                return Collections.emptyList();
+
+            VersionedValue versionedValue = 
epState.getApplicationState(TOKENS);
+            if (versionedValue == null)
+                return Collections.emptyList();
+
+            return TokenSerializer.deserialize(partitioner, new 
DataInputStream(new ByteArrayInputStream(versionedValue.toBytes())));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static NodeState toNodeState(InetAddressAndPort endpoint, 
EndpointState epState)
+    {
+        assert epState != null;
+
+        String status = epState.getStatus();
+        if (status.equals(VersionedValue.STATUS_NORMAL) ||
+            status.equals(VersionedValue.SHUTDOWN))
+            return NodeState.JOINED;
+        if (status.equals(VersionedValue.STATUS_LEFT))
+            return NodeState.LEFT;
+        throw new IllegalStateException("Can't upgrade the first node when 
STATUS = " + status + " for node " + endpoint);
+    }
+
+    private static NodeAddresses 
getAddressesFromEndpointState(InetAddressAndPort endpoint, EndpointState 
epState)
+    {
+        if (endpoint.equals(getBroadcastAddressAndPort()))
+            return NodeAddresses.current();
+        try
+        {
+            InetAddressAndPort local = getEitherState(endpoint, epState, 
INTERNAL_ADDRESS_AND_PORT, INTERNAL_IP, DatabaseDescriptor.getStoragePort());
+            InetAddressAndPort nativeAddress = getEitherState(endpoint, 
epState, NATIVE_ADDRESS_AND_PORT, RPC_ADDRESS, 
DatabaseDescriptor.getNativeTransportPort());
+            return new NodeAddresses(endpoint, local, nativeAddress);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new ConfigurationException("Unknown host in epState for " + 
endpoint + " : " + epState, e);
+        }
+    }
+
+    private static InetAddressAndPort getEitherState(InetAddressAndPort 
endpoint,
+                                                     EndpointState epState,
+                                                     ApplicationState 
primaryState,
+                                                     ApplicationState 
deprecatedState,
+                                                     int 
defaultPortForDeprecatedState) throws UnknownHostException
+    {
+        if (epState.getApplicationState(primaryState) != null)
+        {
+            return getByName(epState.getApplicationState(primaryState).value);
+        }
+        else if (epState.getApplicationState(deprecatedState) != null)
+        {
+            return 
getByNameOverrideDefaults(epState.getApplicationState(deprecatedState).value, 
defaultPortForDeprecatedState);
+        }
+        else
+        {
+            return endpoint.withPort(defaultPortForDeprecatedState);
+        }
+    }
+
+    private static NodeVersion getVersionFromEndpointState(InetAddressAndPort 
endpoint, EndpointState epState)
+    {
+        if (endpoint.equals(getBroadcastAddressAndPort()))
+            return NodeVersion.CURRENT;
+        CassandraVersion cassandraVersion = epState.getReleaseVersion();
+        return NodeVersion.fromCassandraVersion(cassandraVersion);
+    }
+
     public static ClusterMetadata emptyWithSchemaFromSystemTables()
     {
         return new ClusterMetadata(Epoch.UPGRADE_STARTUP,
@@ -59,4 +306,60 @@ public class GossipHelper
                                    Collections.emptySet(),
                                    Collections.emptyMap());
     }
+
+    public static ClusterMetadata fromEndpointStates(DistributedSchema schema, 
Map<InetAddressAndPort, EndpointState> epStates)
+    {
+        return fromEndpointStates(epStates, 
DatabaseDescriptor.getPartitioner(), schema);
+    }
+    @VisibleForTesting
+    public static ClusterMetadata fromEndpointStates(Map<InetAddressAndPort, 
EndpointState> epStates, IPartitioner partitioner, DistributedSchema schema)
+    {
+        Directory directory = new Directory();
+        TokenMap tokenMap = new TokenMap(partitioner);
+        List<InetAddressAndPort> sortedEps = 
Lists.newArrayList(epStates.keySet());
+        Collections.sort(sortedEps);
+        Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new 
HashMap<>();
+        for (InetAddressAndPort endpoint : sortedEps)
+        {
+            EndpointState epState = epStates.get(endpoint);
+            String dc = epState.getApplicationState(DC).value;
+            String rack = epState.getApplicationState(RACK).value;
+            String hostIdString = epState.getApplicationState(HOST_ID).value;
+            NodeAddresses nodeAddresses = 
getAddressesFromEndpointState(endpoint, epState);
+            NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, 
epState);
+            assert hostIdString != null;
+            directory = directory.withNonUpgradedNode(nodeAddresses,
+                                                      new Location(dc, rack),
+                                                      nodeVersion,
+                                                      toNodeState(endpoint, 
epState),
+                                                      
UUID.fromString(hostIdString));
+            NodeId nodeId = directory.peerId(endpoint);
+            tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, 
epState));
+        }
+
+        ClusterMetadata forPlacementCalculation = new 
ClusterMetadata(Epoch.UPGRADE_GOSSIP,
+                                                                      
Period.EMPTY,
+                                                                      true,
+                                                                      
partitioner,
+                                                                      schema,
+                                                                      
directory,
+                                                                      tokenMap,
+                                                                      
DataPlacements.empty(),
+                                                                      
LockedRanges.EMPTY,
+                                                                      
InProgressSequences.EMPTY,
+                                                                      
Collections.emptySet(),
+                                                                      
extensions);
+        return new ClusterMetadata(Epoch.UPGRADE_GOSSIP,
+                                   Period.EMPTY,
+                                   true,
+                                   partitioner,
+                                   schema,
+                                   directory,
+                                   tokenMap,
+                                   new 
UniformRangePlacement().calculatePlacements(forPlacementCalculation, 
schema.getKeyspaces()),
+                                   LockedRanges.EMPTY,
+                                   InProgressSequences.EMPTY,
+                                   Collections.emptySet(),
+                                   extensions);
+    }
 }
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java 
b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index c6ca1f28a0..8101b6721f 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -121,10 +121,9 @@ public final class TraceKeyspace
         Row.SimpleBuilder rb = builder.row();
         rb.ttl(ttl)
           .add("client", client)
-          .add("coordinator", 
FBUtilities.getBroadcastAddressAndPort().getAddress());
-        if (!Gossiper.instance.hasMajorVersion3Nodes())
-            rb.add("coordinator_port", 
FBUtilities.getBroadcastAddressAndPort().getPort());
-        rb.add("request", request)
+          .add("coordinator", 
FBUtilities.getBroadcastAddressAndPort().getAddress())
+          .add("coordinator_port", 
FBUtilities.getBroadcastAddressAndPort().getPort())
+          .add("request", request)
           .add("started_at", new Date(startedAt))
           .add("command", command)
           .appendAll("parameters", parameters);
@@ -148,10 +147,9 @@ public final class TraceKeyspace
                                               .ttl(ttl);
 
         rowBuilder.add("activity", message)
-                  .add("source", 
FBUtilities.getBroadcastAddressAndPort().getAddress());
-        if (!Gossiper.instance.hasMajorVersion3Nodes())
-            rowBuilder.add("source_port", 
FBUtilities.getBroadcastAddressAndPort().getPort());
-        rowBuilder.add("thread", threadName);
+                  .add("source", 
FBUtilities.getBroadcastAddressAndPort().getAddress())
+                  .add("source_port", 
FBUtilities.getBroadcastAddressAndPort().getPort())
+                  .add("thread", threadName);
 
         if (elapsed >= 0)
             rowBuilder.add("source_elapsed", elapsed);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to