This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5879813  Cannot replace_address /X because it doesn't exist in gossip
5879813 is described below

commit 5879813db7e5c9485a393cf79473b77be38ad5b3
Author: David Capwell <[email protected]>
AuthorDate: Tue Dec 15 12:11:02 2020 -0800

    Cannot replace_address /X because it doesn't exist in gossip
    
    patch by David Capwell; reviewed by Brandon Williams, Jon Meredith, Paulo 
Motta, Sam Tunnicliffe for CASSANDRA-16213
---
 CHANGES.txt                                        |   1 +
 .../config/CassandraRelevantProperties.java        |  12 +
 .../org/apache/cassandra/gms/EndpointState.java    |  14 +
 .../cassandra/gms/GossipDigestSynVerbHandler.java  |  34 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    | 124 +++-
 .../org/apache/cassandra/gms/HeartBeatState.java   |  22 +
 .../apache/cassandra/service/StorageService.java   |  89 ++-
 .../apache/cassandra/distributed/Constants.java    |   7 +
 .../distributed/impl/AbstractCluster.java          |  97 ++-
 .../cassandra/distributed/impl/Instance.java       |  20 +-
 .../cassandra/distributed/impl/InstanceConfig.java |   5 +
 .../cassandra/distributed/shared/ClusterUtils.java | 774 +++++++++++++++++++++
 .../shared/{Shared.java => Isolated.java}          |   8 +-
 .../cassandra/distributed/shared/Shared.java       |   2 +
 .../distributed/shared/WithProperties.java         | 114 +++
 .../distributed/test/IPMembershipTest.java         | 106 +++
 .../cassandra/distributed/test/TestBaseImpl.java   |  15 +
 .../AssassinateAbruptDownedNodeTest.java}          |  26 +-
 .../AssassinateGracefullNodeTest.java}             |  26 +-
 .../hostreplacement/AssassinatedEmptyNodeTest.java |  62 ++
 .../test/hostreplacement/BaseAssassinatedCase.java |  93 +++
 .../HostReplacementAbruptDownedInstanceTest.java   | 104 +++
 .../HostReplacementOfDownedClusterTest.java        | 180 +++++
 .../test/hostreplacement/HostReplacementTest.java  | 234 +++++++
 .../org/apache/cassandra/gms/GossiperTest.java     |   6 +-
 25 files changed, 2091 insertions(+), 84 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3f68a8b..e8f89c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@
  * When a table attempts to clean up metrics, it was cleaning up all global 
table metrics (CASSANDRA-16095)
  * Bring back the accepted encryption protocols list as configurable option 
(CASSANDRA-13325)
  * DigestResolver.getData throws AssertionError since dataResponse is null 
(CASSANDRA-16097)
+ * Cannot replace_address /X because it doesn't exist in gossip 
(CASSANDRA-16213)
 Merged from 3.11:
  * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 Merged from 3.0:
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 5d918a8..17b9b2a 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -147,6 +147,18 @@ public enum CassandraRelevantProperties
      */
     BOOTSTRAP_SCHEMA_DELAY_MS("cassandra.schema_delay_ms"),
 
+    /**
+     * Gossip quarantine delay is used while evaluating membership changes and 
should only be changed with extreme care.
+     */
+    GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"),
+
+    /**
+     * When doing a host replacement its possible that the gossip state is 
"empty" meaning that the endpoint is known
+     * but the current state isn't known.  If the host replacement is needed 
to repair this state, this property must
+     * be true.
+     */
+    REPLACEMENT_ALLOW_EMPTY("cassandra.allow_empty_replace_address", "true"),
+
     //cassandra properties (without the "cassandra." prefix)
 
     /**
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 8546a70..a4b294c 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -55,6 +55,11 @@ public class EndpointState
         this(initialHbState, new EnumMap<ApplicationState, 
VersionedValue>(ApplicationState.class));
     }
 
+    public EndpointState(EndpointState other)
+    {
+        this(new HeartBeatState(other.hbState), new 
EnumMap<>(other.applicationState.get()));
+    }
+
     EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
     {
         hbState = initialHbState;
@@ -138,6 +143,15 @@ public class EndpointState
         isAlive = false;
     }
 
+    /**
+     * @return true if {@link HeartBeatState#isEmpty()} is true and no STATUS 
application state exists
+     */
+    public boolean isEmptyWithoutStatus()
+    {
+        Map<ApplicationState, VersionedValue> state = applicationState.get();
+        return hbState.isEmpty() && 
!(state.containsKey(ApplicationState.STATUS_WITH_PORT) || 
state.containsKey(ApplicationState.STATUS));
+    }
+
     public boolean isRpcReady()
     {
         VersionedValue rpcState = 
getApplicationState(ApplicationState.RPC_READY);
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 520dbec..abaa39b 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -17,7 +17,11 @@
  */
 package org.apache.cassandra.gms;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,7 +31,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 
-import static org.apache.cassandra.net.Verb.*;
+import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK;
 
 public class GossipDigestSynVerbHandler extends 
GossipVerbHandler<GossipDigestSyn>
 {
@@ -97,15 +101,31 @@ public class GossipDigestSynVerbHandler extends 
GossipVerbHandler<GossipDigestSy
             logger.trace("Gossip syn digests are : {}", sb);
         }
 
-        List<GossipDigest> deltaGossipDigestList = new 
ArrayList<GossipDigest>();
-        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<InetAddressAndPort, EndpointState>();
-        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, 
deltaEpStateMap);
-        logger.trace("sending {} digests and {} deltas", 
deltaGossipDigestList.size(), deltaEpStateMap.size());
-        Message<GossipDigestAck> gDigestAckMessage = 
Message.out(GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, 
deltaEpStateMap));
+        Message<GossipDigestAck> gDigestAckMessage = gDigestList.isEmpty() ?
+                                                     createShadowReply() :
+                                                     
createNormalReply(gDigestList);
+
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestAckMessage to {}", from);
         MessagingService.instance().send(gDigestAckMessage, from);
 
         super.doVerb(message);
     }
+
+    private static Message<GossipDigestAck> 
createNormalReply(List<GossipDigest> gDigestList)
+    {
+        List<GossipDigest> deltaGossipDigestList = new ArrayList<>();
+        Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new 
HashMap<>();
+        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, 
deltaEpStateMap);
+        logger.trace("sending {} digests and {} deltas", 
deltaGossipDigestList.size(), deltaEpStateMap.size());
+
+        return Message.out(GOSSIP_DIGEST_ACK, new 
GossipDigestAck(deltaGossipDigestList, deltaEpStateMap));
+    }
+
+    private static Message<GossipDigestAck> createShadowReply()
+    {
+        Map<InetAddressAndPort, EndpointState> stateMap = 
Gossiper.instance.examineShadowState();
+        logger.trace("sending 0 digests and {} deltas", stateMap.size());
+        return Message.out(GOSSIP_DIGEST_ACK, new 
GossipDigestAck(Collections.emptyList(), stateMap));
+    }
 }
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 51e7e54..a3be834 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.ECHO_REQ;
 import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
@@ -86,6 +87,7 @@ import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
 public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.net:type=Gossiper";
+
     public static class Props
     {
         public static final String DISABLE_THREAD_VALIDATION = 
"cassandra.gossip.disable_thread_validation";
@@ -107,7 +109,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     private volatile ScheduledFuture<?> scheduledGossipTask;
     private static final ReentrantLock taskLock = new ReentrantLock();
     public final static int intervalInMillis = 1000;
-    public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
+    public final static int QUARANTINE_DELAY = 
GOSSIPER_QUARANTINE_DELAY.getInt(StorageService.RING_DELAY * 2);
     private static final Logger logger = 
LoggerFactory.getLogger(Gossiper.class);
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
 
@@ -1218,9 +1220,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         checkProperThreadForStateMutation();
         if (logger.isTraceEnabled())
             logger.trace("marking as down {}", addr);
-        localState.markDead();
-        liveEndpoints.remove(addr);
-        unreachableEndpoints.put(addr, System.nanoTime());
+        silentlyMarkDead(addr, localState);
         logger.info("InetAddress {} is now DOWN", addr);
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onDead(addr, localState);
@@ -1231,6 +1231,18 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
+     * Used by {@link #markDead(InetAddressAndPort, EndpointState)} and {@link 
#addSavedEndpoint(InetAddressAndPort)}
+     * to register a endpoint as dead.  This method is "silent" to avoid 
triggering listeners, diagnostics, or logs
+     * on startup via addSavedEndpoint.
+     */
+    private void silentlyMarkDead(InetAddressAndPort addr, EndpointState 
localState)
+    {
+        localState.markDead();
+        liveEndpoints.remove(addr);
+        unreachableEndpoints.put(addr, System.nanoTime());
+    }
+
+    /**
      * This method is called whenever there is a "big" change in ep state (a 
generation change for a known node).
      *
      * @param ep      endpoint
@@ -1480,23 +1492,73 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);
     }
 
-    /*
-        This method is used to figure the state that the Gossiper has but 
Gossipee doesn't. The delta digests
-        and the delta state are built up.
-    */
-    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> 
deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap)
+    /**
+     * Used during a shadow round to collect the current state; this method 
clones the current state, no filtering
+     * is done.
+     *
+     * During the shadow round its desirable to return gossip state for remote 
instances that were created by this
+     * process also known as "empty", this is done for host replacement to be 
able to replace downed hosts that are
+     * in the ring but have no state in gossip (see CASSANDRA-16213).
+     *
+     * This method is different than {@link #examineGossiper(List, List, Map)} 
with respect to how "empty" states are
+     * dealt with; they are kept.
+     */
+    Map<InetAddressAndPort, EndpointState> examineShadowState()
     {
-        if (gDigestList.size() == 0)
+        logger.debug("Shadow request received, adding all states");
+        Map<InetAddressAndPort, EndpointState> map = new HashMap<>();
+        for (Entry<InetAddressAndPort, EndpointState> e : 
endpointStateMap.entrySet())
         {
-           /* we've been sent a *completely* empty syn, which should normally 
never happen since an endpoint will at least send a syn with itself.
-              If this is happening then the node is attempting shadow gossip, 
and we should respond with everything we know.
-            */
-            logger.debug("Shadow request received, adding all states");
-            for (Map.Entry<InetAddressAndPort, EndpointState> entry : 
endpointStateMap.entrySet())
+            InetAddressAndPort endpoint = e.getKey();
+            EndpointState state = new EndpointState(e.getValue());
+            if (state.isEmptyWithoutStatus())
             {
-                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
+                // We have no app states loaded for this endpoint, but we may 
well have
+                // some state persisted in the system keyspace. This can 
happen in the case
+                // of a full cluster bounce where one or more nodes fail to 
come up. As
+                // gossip state is transient, the peers which do successfully 
start will be
+                // aware of the failed nodes thanks to 
StorageService::initServer calling
+                // Gossiper.instance::addSavedEndpoint with every endpoint in 
TokenMetadata,
+                // which itself is populated from the system tables at startup.
+                // Here we know that a peer which is starting up and 
attempting to perform
+                // a shadow round of gossip. This peer is in one of two states:
+                // * it is replacing a down node, in which case it needs to 
learn the tokens
+                //   of the down node and optionally its host id.
+                // * it needs to check that no other instance is already 
associated with its
+                //   endpoint address and port.
+                // To support both of these cases, we can add the tokens and 
host id from
+                // the system table, if they exist. These are only ever 
persisted to the system
+                // table when the actual node to which they apply enters the 
UP/NORMAL state.
+                // This invariant will be preserved as nodes never persist or 
propagate the
+                // results of a shadow round, so this communication will be 
strictly limited
+                // to this node and the node performing the shadow round.
+                UUID hostId = SystemKeyspace.loadHostIds().get(endpoint);
+                if (null != hostId)
+                {
+                    state.addApplicationState(ApplicationState.HOST_ID,
+                                                 
StorageService.instance.valueFactory.hostId(hostId));
+                }
+                Set<Token> tokens = SystemKeyspace.loadTokens().get(endpoint);
+                if (null != tokens && !tokens.isEmpty())
+                {
+                    state.addApplicationState(ApplicationState.TOKENS,
+                                                 
StorageService.instance.valueFactory.tokens(tokens));
+                }
             }
+            map.put(endpoint, state);
         }
+        return map;
+    }
+
+    /**
+     * This method is used to figure the state that the Gossiper has but 
Gossipee doesn't. The delta digests
+     * and the delta state are built up.
+     *
+     * When a {@link EndpointState} is "empty" then it is filtered out and not 
added to the delta state (see CASSANDRA-16213).
+     */
+    void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> 
deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap)
+    {
+        assert !gDigestList.isEmpty() : "examineGossiper called with empty 
digest list";
         for ( GossipDigest gDigest : gDigestList )
         {
             int remoteGeneration = gDigest.getGeneration();
@@ -1523,8 +1585,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                 }
                 else if (remoteGeneration < localGeneration)
                 {
-                    /* send all data with generation = localgeneration and 
version > 0 */
-                    sendAll(gDigest, deltaEpStateMap, 0);
+                    /* send all data with generation = localgeneration and 
version > -1 */
+                    sendAll(gDigest, deltaEpStateMap, 
HeartBeatState.EMPTY_VERSION);
                 }
                 else if (remoteGeneration == localGeneration)
                 {
@@ -1779,16 +1841,17 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         if (epState != null)
         {
             logger.debug("not replacing a previous epState for {}, but reusing 
it: {}", ep, epState);
-            epState.setHeartBeatState(new HeartBeatState(0));
+            epState.setHeartBeatState(HeartBeatState.empty());
         }
         else
         {
-            epState = new EndpointState(new HeartBeatState(0));
+            epState = new EndpointState(HeartBeatState.empty());
+            logger.info("Adding {} as there was no previous epState; new state 
is {}", ep, epState);
         }
 
         epState.markDead();
         endpointStateMap.put(ep, epState);
-        unreachableEndpoints.put(ep, System.nanoTime());
+        silentlyMarkDead(ep, epState);
         if (logger.isTraceEnabled())
             logger.trace("Adding saved endpoint {} {}", ep, 
epState.getHeartBeatState().getGeneration());
     }
@@ -1895,6 +1958,25 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return inShadowRound;
     }
 
+    /**
+     * Creates a new dead {@link EndpointState} that is {@link 
EndpointState#isEmptyWithoutStatus() empty}.  This is used during
+     * host replacement for edge cases where the seed notified that the 
endpoint was empty, so need to add such state
+     * into gossip explicitly (as empty endpoints are not gossiped outside of 
the shadow round).
+     *
+     * see CASSANDRA-16213
+     */
+    public void initializeUnreachableNodeUnsafe(InetAddressAndPort addr)
+    {
+        EndpointState state = new EndpointState(HeartBeatState.empty());
+        state.markDead();
+        EndpointState oldState = endpointStateMap.putIfAbsent(addr, state);
+        if (null != oldState)
+        {
+            throw new RuntimeException("Attempted to initialize endpoint state 
for unreachable node, " +
+                                       "but found existing endpoint state for 
it.");
+        }
+    }
+
     @VisibleForTesting
     public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int 
generationNbr)
     {
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java 
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 2abd5d7..75f4f56 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  */
 public class HeartBeatState
 {
+    public static final int EMPTY_VERSION = -1;
+
     public static final IVersionedSerializer<HeartBeatState> serializer = new 
HeartBeatStateSerializer();
 
     private volatile int generation;
@@ -39,12 +41,32 @@ public class HeartBeatState
         this(gen, 0);
     }
 
+    public HeartBeatState(HeartBeatState other)
+    {
+        generation = other.generation;
+        version = other.version;
+    }
+
     public HeartBeatState(int gen, int ver)
     {
         generation = gen;
         version = ver;
     }
 
+    public static HeartBeatState empty()
+    {
+        return new HeartBeatState(0, EMPTY_VERSION);
+    }
+
+    public boolean isEmpty()
+    {
+        // Instance I1 will update this value for I1's state, no other 
instance should.
+        // It is also known that negative version isn't allowed, so can 
leverage this to
+        // know that the state was not generated by I1 but rather than 
instance (normally
+        // happens on startup, the peers are added to gossip with the empty 
state).
+        return version == EMPTY_VERSION;
+    }
+
     int getGeneration()
     {
         return generation;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 8aaf9ab..1b180c0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -33,7 +33,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import javax.management.*;
 import javax.management.openmbean.CompositeData;
@@ -104,7 +103,6 @@ import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
 import org.apache.cassandra.transport.ClientResourceLimits;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.logging.LoggingSupportFactory;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -124,6 +122,7 @@ import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
 import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
 import static 
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
 import static org.apache.cassandra.net.NoPayload.noPayload;
@@ -523,18 +522,68 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.info("Gathering node replacement information for {}", 
replaceAddress);
         Map<InetAddressAndPort, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
         // as we've completed the shadow round of gossip, we should be able to 
find the node we're replacing
-        if (epStates.get(replaceAddress) == null)
+        EndpointState state = epStates.get(replaceAddress);
+        if (state == null)
             throw new RuntimeException(String.format("Cannot replace_address 
%s because it doesn't exist in gossip", replaceAddress));
 
         validateEndpointSnitch(epStates.values().iterator());
 
         try
         {
-            VersionedValue tokensVersionedValue = 
epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = 
state.getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException(String.format("Could not find 
tokens for %s to replace", replaceAddress));
 
-            bootstrapTokens = 
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
+            Collection<Token> tokens = 
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
+            bootstrapTokens = 
validateReplacementBootstrapTokens(tokenMetadata, replaceAddress, tokens);
+
+            if (state.isEmptyWithoutStatus() && 
REPLACEMENT_ALLOW_EMPTY.getBoolean())
+            {
+                logger.warn("Gossip state not present for replacing node {}. 
Adding temporary entry to continue.", replaceAddress);
+
+                // When replacing a node, we take ownership of all its tokens.
+                // If that node is currently down and not present in the 
gossip info
+                // of any other live peers, then we will not be able to take 
ownership
+                // of its tokens during bootstrap as they have no way of being 
propagated
+                // to this node's TokenMetadata. TM is loaded at startup (in 
which case
+                // it will be/ empty for a new replacement node) and only 
updated with
+                // tokens for an endpoint during normal state propagation 
(which will not
+                // occur if no peers have gossip state for it).
+                // However, the presence of host id and tokens in the system 
tables implies
+                // that the node managed to complete bootstrap at some point 
in the past.
+                // Peers may include this information loaded directly from 
system tables
+                // in a GossipDigestAck *only if* the GossipDigestSyn was sent 
as part of a
+                // shadow round (otherwise, a GossipDigestAck contains only 
state about peers
+                // learned via gossip).
+                // It is safe to do this here as since we completed a shadow 
round we know
+                // that :
+                // * replaceAddress successfully bootstrapped at some point 
and owned these
+                //   tokens
+                // * we know that no other node currently owns these tokens
+                // * we are going to completely take over replaceAddress's 
ownership of
+                //   these tokens.
+                tokenMetadata.updateNormalTokens(bootstrapTokens, 
replaceAddress);
+                UUID hostId = Gossiper.instance.getHostId(replaceAddress, 
epStates);
+                if (hostId != null)
+                    tokenMetadata.updateHostId(hostId, replaceAddress);
+
+                // If we were only able to learn about the node being replaced 
through the
+                // shadow gossip round (i.e. there is no state in gossip 
across the cluster
+                // about it, perhaps because the entire cluster has been 
bounced since it went
+                // down), then we're safe to proceed with the replacement. In 
this case, there
+                // will be no local endpoint state as we discard the results 
of the shadow
+                // round after preparing replacement info. We inject a minimal 
EndpointState
+                // to keep FailureDetector::isAlive and 
Gossiper::compareEndpointStartup from
+                // failing later in the replacement, as they both expect the 
replaced node to
+                // be fully present in gossip.
+                // Otherwise, if the replaced node is present in gossip, we 
need check that
+                // it is not in fact live.
+                // We choose to not include the EndpointState provided during 
the shadow round
+                // as its possible to include more state than is desired, so 
by creating a
+                // new empty endpoint without that information we can control 
what is in our
+                // local gossip state
+                
Gossiper.instance.initializeUnreachableNodeUnsafe(replaceAddress);
+            }
         }
         catch (IOException e)
         {
@@ -552,6 +601,35 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return localHostId;
     }
 
+    private static Collection<Token> 
validateReplacementBootstrapTokens(TokenMetadata tokenMetadata,
+                                                                        
InetAddressAndPort replaceAddress,
+                                                                        
Collection<Token> bootstrapTokens)
+    {
+        Map<Token, InetAddressAndPort> conflicts = new HashMap<>();
+        for (Token token : bootstrapTokens)
+        {
+            InetAddressAndPort conflict = tokenMetadata.getEndpoint(token);
+            if (null != conflict && !conflict.equals(replaceAddress))
+                conflicts.put(token, tokenMetadata.getEndpoint(token));
+        }
+
+        if (!conflicts.isEmpty())
+        {
+            String error = String.format("Conflicting token ownership 
information detected between " +
+                                         "gossip and current ring view during 
proposed replacement " +
+                                         "of %s. Some tokens identified in 
gossip for the node being " +
+                                         "replaced are currently owned by 
other peers: %s",
+                                         replaceAddress,
+                                         conflicts.entrySet()
+                                                  .stream()
+                                                  .map(e -> e.getKey() + "(" + 
e.getValue() + ")" )
+                                                  
.collect(Collectors.joining(",")));
+            throw new RuntimeException(error);
+
+        }
+        return bootstrapTokens;
+    }
+
     private synchronized void checkForEndpointCollision(UUID localHostId, 
Set<InetAddressAndPort> peers) throws ConfigurationException
     {
         if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
@@ -2615,6 +2693,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
             SystemKeyspace.updateTokens(endpoint, 
tokensToUpdateInSystemKeyspace);
     }
+
     /**
      * Handle node move to normal state. That is, node is entering token ring 
and participating
      * in reads.
diff --git a/test/distributed/org/apache/cassandra/distributed/Constants.java 
b/test/distributed/org/apache/cassandra/distributed/Constants.java
index b7d2d26..d7a2acd 100644
--- a/test/distributed/org/apache/cassandra/distributed/Constants.java
+++ b/test/distributed/org/apache/cassandra/distributed/Constants.java
@@ -31,4 +31,11 @@ public final class Constants
      * of the YAML is not desired.
      */
     public static final String KEY_DTEST_API_CONFIG_CHECK = 
"dtest.api.config.check";
+
+    /**
+     * Property used by AbstractCluster to determine how a failed Instance 
startup state should be; if not set
+     * the Instance is marked as "shutdown", but this flag can be used to 
leave the instance "running" by setting
+     * 'true'.
+     */
+    public static final String KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN = 
"dtest.api.startup.failure_as_shutdown";
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0eea077..53502f9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -19,16 +19,18 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.io.File;
+import java.lang.annotation.Annotation;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +43,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +70,7 @@ import org.apache.cassandra.distributed.api.LogAction;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.distributed.shared.Isolated;
 import org.apache.cassandra.distributed.shared.MessageFilters;
 import org.apache.cassandra.distributed.shared.Metrics;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
@@ -77,6 +82,7 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
 
 import static 
org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
 
@@ -116,10 +122,15 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
 
     // include byteman so tests can use
     private static final Set<String> SHARED_CLASSES = 
findClassesMarkedForSharedClassLoader();
-    private static final Predicate<String> SHARED_PREDICATE = s ->
-                                                              
SHARED_CLASSES.contains(s) ||
-                                                              
InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
-                                                              
s.startsWith("org.jboss.byteman");
+    private static final Set<String> ISOLATED_CLASSES = 
findClassesMarkedForInstanceClassLoader();
+    private static final Predicate<String> SHARED_PREDICATE = s -> {
+        if (ISOLATED_CLASSES.contains(s))
+            return false;
+
+        return SHARED_CLASSES.contains(s) ||
+               InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
+               s.startsWith("org.jboss.byteman");
+    };
 
     private final UUID clusterId = UUID.randomUUID();
     private final File root;
@@ -171,7 +182,10 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         private final IInstanceConfig config;
         private volatile IInvokableInstance delegate;
         private volatile Versions.Version version;
+        @GuardedBy("this")
         private volatile boolean isShutdown = true;
+        @GuardedBy("this")
+        private InetSocketAddress broadcastAddress;
 
         protected IInvokableInstance delegate()
         {
@@ -194,6 +208,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
             this.version = version;
             // we ensure there is always a non-null delegate, so that the 
executor may be used while the node is offline
             this.delegate = newInstance(generation);
+            this.broadcastAddress = config.broadcastAddress();
         }
 
         private IInvokableInstance newInstance(int generation)
@@ -215,20 +230,55 @@ public abstract class AbstractCluster<I extends 
IInstance> implements ICluster<I
             return isShutdown;
         }
 
+        private boolean isRunning()
+        {
+            return !isShutdown;
+        }
+
         @Override
         public synchronized void startup()
         {
             startup(AbstractCluster.this);
         }
-
         public synchronized void startup(ICluster cluster)
         {
             if (cluster != AbstractCluster.this)
                 throw new IllegalArgumentException("Only the owning cluster 
can be used for startup");
-            if (!isShutdown)
-                throw new IllegalStateException();
-            delegateForStartup().startup(cluster);
+            if (isRunning())
+                throw new IllegalStateException("Can not start a instance that 
is already running");
             isShutdown = false;
+            if (!broadcastAddress.equals(config.broadcastAddress()))
+            {
+                // previous address != desired address, so cleanup
+                InetSocketAddress previous = broadcastAddress;
+                InetSocketAddress newAddress = config.broadcastAddress();
+                instanceMap.put(newAddress, (I) this); // if the broadcast 
address changes, update
+                instanceMap.remove(previous);
+                broadcastAddress = newAddress;
+            }
+            try
+            {
+                delegateForStartup().startup(cluster);
+            }
+            catch (Throwable t)
+            {
+                if 
(config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN) == null)
+                {
+                    // its possible that the failure happens after listening 
and threads are started up
+                    // but without knowing the start up phase it isn't safe to 
call shutdown, so assume
+                    // that a failed to start instance was shutdown (which 
would be true if each instance
+                    // was its own JVM).
+                    isShutdown = true;
+                }
+                else
+                {
+                    // user was explict about the desired behavior, respect it
+                    // the most common reason to set this is to set 'false', 
this will leave the
+                    // instance marked as running, which will have .close shut 
it down.
+                    isShutdown = (boolean) 
config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN);
+                }
+                throw t;
+            }
             updateMessagingVersions();
         }
 
@@ -241,8 +291,8 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         @Override
         public synchronized Future<Void> shutdown(boolean graceful)
         {
-            if (isShutdown)
-                throw new IllegalStateException();
+            if (isShutdown())
+                throw new IllegalStateException("Instance is not running, so 
can not be shutdown");
             isShutdown = true;
             Future<Void> future = delegate.shutdown(graceful);
             delegate = null;
@@ -251,7 +301,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
 
         public int liveMemberCount()
         {
-            if (!isShutdown && delegate != null)
+            if (isRunning() && delegate != null)
                 return delegate().liveMemberCount();
 
             throw new IllegalStateException("Cannot get live member count on 
shutdown instance: " + config.num());
@@ -283,7 +333,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         public void receiveMessage(IMessage message)
         {
             IInvokableInstance delegate = this.delegate;
-            if (!isShutdown && delegate != null) // since we sync directly on 
the other node, we drop messages immediately if we are shutdown
+            if (isRunning() && delegate != null) // since we sync directly on 
the other node, we drop messages immediately if we are shutdown
                 delegate.receiveMessage(message);
         }
 
@@ -302,7 +352,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         @Override
         public synchronized void setVersion(Versions.Version version)
         {
-            if (!isShutdown)
+            if (isRunning())
                 throw new IllegalStateException("Must be shutdown before 
version can be modified");
             // re-initialise
             this.version = version;
@@ -343,7 +393,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         this.broadcastPort = builder.getBroadcastPort();
         this.nodeProvisionStrategy = builder.nodeProvisionStrategy;
         this.instances = new ArrayList<>();
-        this.instanceMap = new HashMap<>();
+        this.instanceMap = new ConcurrentHashMap<>();
         this.initialVersion = builder.getVersion();
         this.filters = new MessageFilters();
         this.instanceInitializer = builder.getInstanceInitializer();
@@ -861,9 +911,20 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
 
     private static Set<String> findClassesMarkedForSharedClassLoader()
     {
-        return new 
Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
-                                .map(Class::getName)
-                                .collect(Collectors.toSet());
+        return findClassesMarkedWith(Shared.class);
+    }
+
+    private static Set<String> findClassesMarkedForInstanceClassLoader()
+    {
+        return findClassesMarkedWith(Isolated.class);
+    }
+
+    private static Set<String> findClassesMarkedWith(Class<? extends 
Annotation> annotation)
+    {
+        return new 
Reflections(ConfigurationBuilder.build("org.apache.cassandra").setExpandSuperTypes(false))
+               .getTypesAnnotatedWith(annotation).stream()
+               .map(Class::getName)
+               .collect(Collectors.toSet());
     }
 }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index a4163a1..87d84c8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -257,7 +257,9 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     {
         MessagingService.instance().outboundSink.add((message, to) -> {
             InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
-            
cluster.get(toAddr).receiveMessage(serializeMessage(message.from(), to, 
message));
+            IInstance toInstance = cluster.get(toAddr);
+            if (toInstance != null)
+                toInstance.receiveMessage(serializeMessage(message.from(), to, 
message));
             return false;
         });
     }
@@ -268,7 +270,10 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             if (isShutdown())
                 return false;
             IMessage serialized = serializeMessage(message.from(), 
toCassandraInetAddressAndPort(broadcastAddress()), message);
-            int fromNum = cluster.get(serialized.from()).config().num();
+            IInstance from = cluster.get(serialized.from());
+            if (from == null)
+                return false;
+            int fromNum = from.config().num();
             int toNum = config.num(); // since this instance is reciving the 
message, to will always be this instance
             return cluster.filters().permitInbound(fromNum, toNum, serialized);
         });
@@ -281,7 +286,10 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 return false;
             IMessage serialzied = serializeMessage(message.from(), to, 
message);
             int fromNum = config.num(); // since this instance is sending the 
message, from will always be this instance
-            int toNum = 
cluster.get(fromCassandraInetAddressAndPort(to)).config().num();
+            IInstance toInstance = 
cluster.get(fromCassandraInetAddressAndPort(to));
+            if (toInstance == null)
+                return false;
+            int toNum = toInstance.config().num();
             return cluster.filters().permitOutbound(fromNum, toNum, 
serialzied);
         });
     }
@@ -455,6 +463,9 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                     throw new RuntimeException(e);
                 }
 
+                // Re-populate token metadata after commit log recover (new 
peers might be loaded onto system keyspace #10293)
+                StorageService.instance.populateTokenMetadata();
+
                 Verb.REQUEST_RSP.unsafeSetSerializer(() -> 
ReadResponse.serializer);
 
                 if (config.has(NETWORK))
@@ -685,6 +696,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 .thenRun(super::shutdown);
     }
 
+    @Override
     public int liveMemberCount()
     {
         return sync(() -> {
@@ -694,11 +706,13 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }).call();
     }
 
+    @Override
     public Metrics metrics()
     {
         return callOnInstance(() -> new 
InstanceMetrics(CassandraMetricsRegistry.Metrics));
     }
 
+    @Override
     public NodeToolResult nodetoolResult(boolean withNotifications, String... 
commandAndArgs)
     {
         return sync(() -> {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index bf615cd..895f2a7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -139,6 +139,11 @@ public class InstanceConfig implements IInstanceConfig
         return 
DistributedTestSnitch.fromCassandraInetAddressAndPort(getBroadcastAddressAndPort());
     }
 
+    public void unsetBroadcastAddressAndPort()
+    {
+        broadcastAddressAndPort = null;
+    }
+
     protected InetAddressAndPort getBroadcastAddressAndPort()
     {
         if (broadcastAddressAndPort == null)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
new file mode 100644
index 0000000..a68e819
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.service.StorageService;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Utilities for working with jvm-dtest clusters.
+ *
+ * This class is marked as Isolated as it relies on lambdas, which are in a 
package that is marked as shared, so need to
+ * tell jvm-dtest to not share this class.
+ *
+ * This class should never be called from within the cluster, always in the 
App ClassLoader.
+ */
+@Isolated
+public class ClusterUtils
+{
+    /**
+     * Start the instance with the given System Properties, after the instance 
has started, the properties will be cleared.
+     */
+    public static <I extends IInstance> I start(I inst, 
Consumer<WithProperties> fn)
+    {
+        return start(inst, (ignore, prop) -> fn.accept(prop));
+    }
+
+    /**
+     * Start the instance with the given System Properties, after the instance 
has started, the properties will be cleared.
+     */
+    public static <I extends IInstance> I start(I inst, BiConsumer<I, 
WithProperties> fn)
+    {
+        try (WithProperties properties = new WithProperties())
+        {
+            fn.accept(inst, properties);
+            inst.startup();
+            return inst;
+        }
+    }
+
+    /**
+     * Stop an instance in a blocking manner.
+     *
+     * The main difference between this and {@link IInstance#shutdown()} is 
that the wait on the future will catch
+     * the exceptions and throw as runtime.
+     */
+    public static void stopUnchecked(IInstance i)
+    {
+        Futures.getUnchecked(i.shutdown());
+    }
+
+    /**
+     * Stops an instance abruptly.  This is done by blocking all messages 
to/from so all other instances are unable
+     * to communicate, then stopping the instance gracefully.
+     *
+     * The assumption is that hard stopping inbound and outbound messages will 
apear to the cluster as if the instance
+     * was stopped via kill -9; this does not hold true if the instance is 
restarted as it knows it was properly shutdown.
+     *
+     * @param cluster to filter messages to
+     * @param inst to shut down
+     */
+    public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I 
inst)
+    {
+        // block all messages to/from the node going down to make sure a clean 
shutdown doesn't happen
+        IMessageFilters.Filter to = 
cluster.filters().allVerbs().to(inst.config().num()).drop();
+        IMessageFilters.Filter from = 
cluster.filters().allVerbs().from(inst.config().num()).drop();
+        try
+        {
+            stopUnchecked(inst);
+        }
+        finally
+        {
+            from.off();
+            to.off();
+        }
+    }
+
+    /**
+     * Stop all the instances in the cluster.  This function is differe than 
{@link ICluster#close()} as it doesn't
+     * clean up the cluster state, it only stops all the instances.
+     */
+    public static <I extends IInstance> void stopAll(ICluster<I> cluster)
+    {
+        cluster.stream().forEach(ClusterUtils::stopUnchecked);
+    }
+
+    /**
+     * Create a new instance and add it to the cluster, without starting it.
+     *
+     * @param cluster to add to
+     * @param dc the instance should be in
+     * @param rack the instance should be in
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I addInstance(AbstractCluster<I> 
cluster,
+                                                      String dc, String rack)
+    {
+        return addInstance(cluster, dc, rack, ignore -> {});
+    }
+
+    /**
+     * Create a new instance and add it to the cluster, without starting it.
+     *
+     * @param cluster to add to
+     * @param dc the instance should be in
+     * @param rack the instance should be in
+     * @param fn function to add to the config before starting
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I addInstance(AbstractCluster<I> 
cluster,
+                                                      String dc, String rack,
+                                                      
Consumer<IInstanceConfig> fn)
+    {
+        Objects.requireNonNull(dc, "dc");
+        Objects.requireNonNull(rack, "rack");
+
+        InstanceConfig config = cluster.newInstanceConfig();
+        //TODO adding new instances should be cleaner, currently requires you 
create the cluster with all
+        // instances known about (at least to NetworkTopology and TokenStategy)
+        // this is very hidden, so should be more explicit
+        config.networkTopology().put(config.broadcastAddress(), 
NetworkTopology.dcAndRack(dc, rack));
+
+        fn.accept(config);
+
+        return cluster.bootstrap(config);
+    }
+
+    /**
+     * Create and start a new instance that replaces an existing instance.
+     *
+     * The instance will be in the same datacenter and rack as the existing 
instance.
+     *
+     * @param cluster to add to
+     * @param toReplace instance to replace
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I 
replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace)
+    {
+        return replaceHostAndStart(cluster, toReplace, ignore -> {});
+    }
+
+    /**
+     * Create and start a new instance that replaces an existing instance.
+     *
+     * The instance will be in the same datacenter and rack as the existing 
instance.
+     *
+     * @param cluster to add to
+     * @param toReplace instance to replace
+     * @param fn lambda to add additional properties
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I 
replaceHostAndStart(AbstractCluster<I> cluster,
+                                                              IInstance 
toReplace,
+                                                              
Consumer<WithProperties> fn)
+    {
+        IInstanceConfig toReplaceConf = toReplace.config();
+        I inst = addInstance(cluster, toReplaceConf.localDatacenter(), 
toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
+
+        return start(inst, properties -> {
+            // lower this so the replacement waits less time
+            properties.setProperty("cassandra.broadcast_interval_ms", 
Long.toString(TimeUnit.SECONDS.toMillis(30)));
+            // default is 30s, lowering as it should be faster
+            properties.setProperty("cassandra.ring_delay_ms", 
Long.toString(TimeUnit.SECONDS.toMillis(10)));
+            properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, 
TimeUnit.SECONDS.toMillis(10));
+
+            // state which node to replace
+            properties.setProperty("cassandra.replace_address_first_boot", 
toReplace.config().broadcastAddress().getAddress().getHostAddress());
+
+            fn.accept(properties);
+        });
+    }
+
+    /**
+     * Calls {@link 
org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list 
of strings.
+     */
+    public static List<String> getTokenMetadataTokens(IInvokableInstance inst)
+    {
+        return inst.callOnInstance(() ->
+                                   StorageService.instance.getTokenMetadata()
+                                                          
.sortedTokens().stream()
+                                                          
.map(Object::toString)
+                                                          
.collect(Collectors.toList()));
+    }
+
+    /**
+     * Get the ring from the perspective of the instance.
+     */
+    public static List<RingInstanceDetails> ring(IInstance inst)
+    {
+        NodeToolResult results = inst.nodetoolResult("ring");
+        results.asserts().success();
+        return parseRing(results.getStdout());
+    }
+
+    /**
+     * Make sure the target instance is in the ring.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing instance expected in the ring
+     * @return the ring (if target is present)
+     */
+    public static List<RingInstanceDetails> assertInRing(IInstance instance, 
IInstance expectedInRing)
+    {
+        String targetAddress = getBroadcastAddressHostString(expectedInRing);
+        List<RingInstanceDetails> ring = ring(instance);
+        Optional<RingInstanceDetails> match = ring.stream().filter(d -> 
d.address.equals(targetAddress)).findFirst();
+        assertThat(match).as("Not expected to find %s but was found", 
targetAddress).isPresent();
+        return ring;
+    }
+
+    /**
+     * Make sure the target instance's gossip state matches on the source 
instance
+     *
+     * @param instance instance to check on
+     * @param expectedInRing instance expected in the ring
+     * @param state expected gossip state
+     * @return the ring (if target is present and has expected state)
+     */
+    public static List<RingInstanceDetails> assertRingState(IInstance 
instance, IInstance expectedInRing, String state)
+    {
+        String targetAddress = getBroadcastAddressHostString(expectedInRing);
+        List<RingInstanceDetails> ring = ring(instance);
+        List<RingInstanceDetails> match = ring.stream()
+                                              .filter(d -> 
d.address.equals(targetAddress))
+                                              .collect(Collectors.toList());
+        assertThat(match)
+        .isNotEmpty()
+        .as("State was expected to be %s but was not", state)
+        .anyMatch(r -> r.state.equals(state));
+        return ring;
+    }
+
+    /**
+     * Make sure the target instance is NOT in the ring.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing instance not expected in the ring
+     * @return the ring (if target is not present)
+     */
+    public static List<RingInstanceDetails> assertNotInRing(IInstance 
instance, IInstance expectedInRing)
+    {
+        String targetAddress = getBroadcastAddressHostString(expectedInRing);
+        List<RingInstanceDetails> ring = ring(instance);
+        Optional<RingInstanceDetails> match = ring.stream().filter(d -> 
d.address.equals(targetAddress)).findFirst();
+        Assert.assertEquals("Not expected to find " + targetAddress + " but 
was found", Optional.empty(), match);
+        return ring;
+    }
+
+    private static List<RingInstanceDetails> awaitRing(IInstance src, String 
errorMessage, Predicate<List<RingInstanceDetails>> fn)
+    {
+        List<RingInstanceDetails> ring = null;
+        for (int i = 0; i < 100; i++)
+        {
+            ring = ring(src);
+            if (fn.test(ring))
+            {
+                return ring;
+            }
+            sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        throw new AssertionError(errorMessage + "\n" + ring);
+    }
+
+    /**
+     * Wait for the target to be in the ring as seen by the source instance.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing instance to wait for
+     * @return the ring
+     */
+    public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, 
IInstance expectedInRing)
+    {
+        return awaitRingJoin(instance, 
expectedInRing.broadcastAddress().getAddress().getHostAddress());
+    }
+
+    /**
+     * Wait for the target to be in the ring as seen by the source instance.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing instance address to wait for
+     * @return the ring
+     */
+    public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, 
String expectedInRing)
+    {
+        return awaitRing(instance, "Node " + expectedInRing + " did not join 
the ring...", ring -> {
+            Optional<RingInstanceDetails> match = ring.stream().filter(d -> 
d.address.equals(expectedInRing)).findFirst();
+            if (match.isPresent())
+            {
+                RingInstanceDetails details = match.get();
+                return details.status.equals("Up") && 
details.state.equals("Normal");
+            }
+            return false;
+        });
+    }
+
+    /**
+     * Wait for the ring to only have instances that are Up and Normal.
+     *
+     * @param src instance to check on
+     * @return the ring
+     */
+    public static List<RingInstanceDetails> awaitRingHealthy(IInstance src)
+    {
+        return awaitRing(src, "Timeout waiting for ring to become healthy",
+                         ring ->
+                         
ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy));
+    }
+
+    /**
+     * Wait for the ring to have the target instance with the provided state.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing to look for
+     * @param state expected
+     * @return the ring
+     */
+    public static List<RingInstanceDetails> awaitRingState(IInstance instance, 
IInstance expectedInRing, String state)
+    {
+        return awaitRing(instance, "Timeout waiting for " + expectedInRing + " 
to have state " + state,
+                         ring ->
+                         ring.stream()
+                             .filter(d -> 
d.address.equals(getBroadcastAddressHostString(expectedInRing)))
+                             .filter(d -> d.state.equals(state))
+                             .findAny().isPresent());
+    }
+
+    /**
+     * Make sure the ring is only the expected instances.  The source instance 
may not be in the ring, so this function
+     * only relies on the expectedInsts param.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing expected instances in the ring
+     * @return the ring (if condition is true)
+     */
+    public static List<RingInstanceDetails> assertRingIs(IInstance instance, 
IInstance... expectedInRing)
+    {
+        return assertRingIs(instance, Arrays.asList(expectedInRing));
+    }
+
+    /**
+     * Make sure the ring is only the expected instances.  The source instance 
may not be in the ring, so this function
+     * only relies on the expectedInsts param.
+     *
+     * @param instance instance to check on
+     * @param expectedInRing expected instances in the ring
+     * @return the ring (if condition is true)
+     */
+    public static List<RingInstanceDetails> assertRingIs(IInstance instance, 
Collection<? extends IInstance> expectedInRing)
+    {
+        Set<String> expectedRingAddresses = expectedInRing.stream()
+                                                         .map(i -> 
i.config().broadcastAddress().getAddress().getHostAddress())
+                                                         
.collect(Collectors.toSet());
+        return assertRingIs(instance, expectedRingAddresses);
+    }
+
+    /**
+     * Make sure the ring is only the expected instances.  The source instance 
may not be in the ring, so this function
+     * only relies on the expectedInsts param.
+     *
+     * @param instance instance to check on
+     * @param expectedRingAddresses expected instances addresses in the ring
+     * @return the ring (if condition is true)
+     */
+    public static List<RingInstanceDetails> assertRingIs(IInstance instance, 
Set<String> expectedRingAddresses)
+    {
+        List<RingInstanceDetails> ring = ring(instance);
+        Set<String> ringAddresses = ring.stream().map(d -> 
d.address).collect(Collectors.toSet());
+        assertThat(ringAddresses)
+        .as("Ring addreses did not match for instance %s", instance)
+        .isEqualTo(expectedRingAddresses);
+        return ring;
+    }
+
+    private static boolean isRingInstanceDetailsHealthy(RingInstanceDetails 
details)
+    {
+        return details.status.equals("Up") && details.state.equals("Normal");
+    }
+
+    private static List<RingInstanceDetails> parseRing(String str)
+    {
+        // 127.0.0.3  rack0       Up     Normal  46.21 KB        100.00%       
      -1
+        // /127.0.0.1:7012  Unknown     ?      Normal  ?               100.00% 
            -3074457345618258603
+        Pattern pattern = 
Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
+        List<RingInstanceDetails> details = new ArrayList<>();
+        String[] lines = str.split("\n");
+        for (String line : lines)
+        {
+            Matcher matcher = pattern.matcher(line);
+            if (!matcher.find())
+            {
+                continue;
+            }
+            details.add(new RingInstanceDetails(matcher.group(1), 
matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
+        }
+
+        return details;
+    }
+
+    private static Map<String, Map<String, String>> awaitGossip(IInstance src, 
String errorMessage, Predicate<Map<String, Map<String, String>>> fn)
+    {
+        Map<String, Map<String, String>> gossip = null;
+        for (int i = 0; i < 100; i++)
+        {
+            gossip = gossipInfo(src);
+            if (fn.test(gossip))
+            {
+                return gossip;
+            }
+            sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        throw new AssertionError(errorMessage + "\n" + gossip);
+    }
+
+    /**
+     * Wait for the target instance to have the desired status. Target status 
is checked via string contains so works
+     * with 'NORMAL' but also can check tokens or full state.
+     *
+     * @param instance instance to check on
+     * @param expectedInGossip instance to wait for
+     * @param targetStatus for the instance
+     * @return gossip info
+     */
+    public static Map<String, Map<String, String>> awaitGossipStatus(IInstance 
instance, IInstance expectedInGossip, String targetStatus)
+    {
+        return awaitGossip(instance, "Node " + expectedInGossip + " did not 
match state " + targetStatus, gossip -> {
+            Map<String, String> state = 
gossip.get(getBroadcastAddressString(expectedInGossip));
+            if (state == null)
+                return false;
+            String status = state.get("STATUS_WITH_PORT");
+            if (status == null)
+                status = state.get("STATUS");
+            if (status == null)
+                return targetStatus == null;
+            return status.contains(targetStatus);
+        });
+    }
+
+    /**
+     * Get the gossip information from the node.  Currently only address, 
generation, and heartbeat are returned
+     *
+     * @param inst to check on
+     * @return gossip info
+     */
+    public static Map<String, Map<String, String>> gossipInfo(IInstance inst)
+    {
+        NodeToolResult results = inst.nodetoolResult("gossipinfo");
+        results.asserts().success();
+        return parseGossipInfo(results.getStdout());
+    }
+
+    /**
+     * Make sure the gossip info for the specific target has the expected 
generation and heartbeat
+     *
+     * @param instance to check on
+     * @param expectedInGossip instance to check for
+     * @param expectedGeneration expected generation
+     * @param expectedHeartbeat expected heartbeat
+     */
+    public static void assertGossipInfo(IInstance instance,
+                                        InetSocketAddress expectedInGossip, 
int expectedGeneration, int expectedHeartbeat)
+    {
+        String targetAddress = expectedInGossip.getAddress().toString();
+        Map<String, Map<String, String>> gossipInfo = gossipInfo(instance);
+        Map<String, String> gossipState = gossipInfo.get(targetAddress);
+        if (gossipState == null)
+            throw new NullPointerException("Unable to find gossip info for " + 
targetAddress + "; gossip info = " + gossipInfo);
+        Assert.assertEquals(Long.toString(expectedGeneration), 
gossipState.get("generation"));
+        Assert.assertEquals(Long.toString(expectedHeartbeat), 
gossipState.get("heartbeat")); //TODO do we really mix these two?
+    }
+
+    private static Map<String, Map<String, String>> parseGossipInfo(String str)
+    {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        String[] lines = str.split("\n");
+        String currentInstance = null;
+        for (String line : lines)
+        {
+            if (line.startsWith("/"))
+            {
+                // start of new instance
+                currentInstance = line;
+                continue;
+            }
+            Objects.requireNonNull(currentInstance);
+            String[] kv = line.trim().split(":", 2);
+            assert kv.length == 2 : "When splitting line '" + line + "' 
expected 2 parts but not true";
+            Map<String, String> state = map.computeIfAbsent(currentInstance, 
ignore -> new HashMap<>());
+            state.put(kv[0], kv[1]);
+        }
+
+        return map;
+    }
+
+    /**
+     * Get the tokens assigned to the instance via config.  This method does 
not work if the instance has learned
+     * or generated its tokens.
+     *
+     * @param instance to get tokens from
+     * @return non-empty list of tokens
+     */
+    public static List<String> getTokens(IInstance instance)
+    {
+        IInstanceConfig conf = instance.config();
+        int numTokens = conf.getInt("num_tokens");
+        Assert.assertEquals("Only single token is supported", 1, numTokens);
+        String token = conf.getString("initial_token");
+        Assert.assertNotNull("initial_token was not found", token);
+        return Arrays.asList(token);
+    }
+
+    /**
+     * Get all data directories for the given instance.
+     *
+     * @param instance to get data directories for
+     * @return data directories
+     */
+    public static List<File> getDataDirectories(IInstance instance)
+    {
+        IInstanceConfig conf = instance.config();
+        // this isn't safe as it assumes the implementation of InstanceConfig
+        // might need to get smarter... some day...
+        String[] ds = (String[]) conf.get("data_file_directories");
+        List<File> files = new ArrayList<>(ds.length);
+        for (int i = 0; i < ds.length; i++)
+            files.add(new File(ds[i]));
+        return files;
+    }
+
+    /**
+     * Get the commit log directory for the given instance.
+     *
+     * @param instance to get the commit log directory for
+     * @return commit log directory
+     */
+    public static File getCommitLogDirectory(IInstance instance)
+    {
+        IInstanceConfig conf = instance.config();
+        // this isn't safe as it assumes the implementation of InstanceConfig
+        // might need to get smarter... some day...
+        String d = (String) conf.get("commitlog_directory");
+        return new File(d);
+    }
+
+    /**
+     * Get the hints directory for the given instance.
+     *
+     * @param instance to get the hints directory for
+     * @return hints directory
+     */
+    public static File getHintsDirectory(IInstance instance)
+    {
+        IInstanceConfig conf = instance.config();
+        // this isn't safe as it assumes the implementation of InstanceConfig
+        // might need to get smarter... some day...
+        String d = (String) conf.get("hints_directory");
+        return new File(d);
+    }
+
+    /**
+     * Get the saved caches directory for the given instance.
+     *
+     * @param instance to get the saved caches directory for
+     * @return saved caches directory
+     */
+    public static File getSavedCachesDirectory(IInstance instance)
+    {
+        IInstanceConfig conf = instance.config();
+        // this isn't safe as it assumes the implementation of InstanceConfig
+        // might need to get smarter... some day...
+        String d = (String) conf.get("saved_caches_directory");
+        return new File(d);
+    }
+
+    /**
+     * Get all writable directories for the given instance.
+     *
+     * @param instance to get directories for
+     * @return all writable directories
+     */
+    public static List<File> getDirectories(IInstance instance)
+    {
+        List<File> out = new ArrayList<>();
+        out.addAll(getDataDirectories(instance));
+        out.add(getCommitLogDirectory(instance));
+        out.add(getHintsDirectory(instance));
+        out.add(getSavedCachesDirectory(instance));
+        return out;
+    }
+
+    /**
+     * Gets the name of the Partitioner for the given instance.
+     *
+     * @param instance to get partitioner from
+     * @return partitioner name
+     */
+    public static String getPartitionerName(IInstance instance)
+    {
+        return (String) instance.config().get("partitioner");
+    }
+
+    /**
+     * Changes the instance's address to the new address.  This method should 
only be called while the instance is
+     * down, else has undefined behavior.
+     *
+     * @param instance to update address for
+     * @param address to set
+     */
+    public static void updateAddress(IInstance instance, String address)
+    {
+        updateAddress(instance.config(), address);
+    }
+
+    /**
+     * Changes the instance's address to the new address.  This method should 
only be called while the instance is
+     * down, else has undefined behavior.
+     *
+     * @param conf to update address for
+     * @param address to set
+     */
+    private static void updateAddress(IInstanceConfig conf, String address)
+    {
+        for (String key : Arrays.asList("broadcast_address", "listen_address", 
"broadcast_rpc_address", "rpc_address"))
+            conf.set(key, address);
+
+        // InstanceConfig caches InetSocketAddress -> InetAddressAndPort
+        // this causes issues as startup now ignores config, so force reset it 
to pull from conf.
+        ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove 
the need to null out the cache...
+        conf.networkTopology().put(conf.broadcastAddress(), 
NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+    }
+
+    /**
+     * Get the broadcast address host address only (ex. 127.0.0.1)
+     */
+    private static String getBroadcastAddressHostString(IInstance target)
+    {
+        return 
target.config().broadcastAddress().getAddress().getHostAddress();
+    }
+
+    /**
+     * Get the broadcast address in host:port format (ex. 127.0.0.1:7190)
+     */
+    public static String getBroadcastAddressHostWithPortString(IInstance 
target)
+    {
+        InetSocketAddress address = target.config().broadcastAddress();
+        return address.getAddress().getHostAddress() + ":" + address.getPort();
+    }
+
+    /**
+     * Get the broadcast address InetAddess string (ex. localhost/127.0.0.1 or 
/127.0.0.1)
+     */
+    private static String getBroadcastAddressString(IInstance target)
+    {
+        return target.config().broadcastAddress().getAddress().toString();
+    }
+
+    public static final class RingInstanceDetails
+    {
+        private final String address;
+        private final String rack;
+        private final String status;
+        private final String state;
+        private final String token;
+
+        private RingInstanceDetails(String address, String rack, String 
status, String state, String token)
+        {
+            this.address = address;
+            this.rack = rack;
+            this.status = status;
+            this.state = state;
+            this.token = token;
+        }
+
+        public String getAddress()
+        {
+            return address;
+        }
+
+        public String getRack()
+        {
+            return rack;
+        }
+
+        public String getStatus()
+        {
+            return status;
+        }
+
+        public String getState()
+        {
+            return state;
+        }
+
+        public String getToken()
+        {
+            return token;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            RingInstanceDetails that = (RingInstanceDetails) o;
+            return Objects.equals(address, that.address) &&
+                   Objects.equals(rack, that.rack) &&
+                   Objects.equals(status, that.status) &&
+                   Objects.equals(state, that.state) &&
+                   Objects.equals(token, that.token);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(address, rack, status, state, token);
+        }
+
+        public String toString()
+        {
+            return Arrays.asList(address, rack, status, state, 
token).toString();
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java 
b/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
similarity index 82%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
index a1047b6..898631f 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
@@ -24,14 +24,16 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Tells jvm-dtest that a class should be shared accross all {@link 
ClassLoader}s.
+ * Tells jvm-dtest that a class should be isolated and loaded into the 
instance class loader.
  *
  * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
  * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class 
accross all class loaders.
+ * is not desirable, this annotation will tell jvm-dtest to isolate the class 
accross all class loaders.
+ *
+ * This is the oposite of {@link Shared}.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target({ ElementType.TYPE })
-public @interface Shared
+public @interface Isolated
 {
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java 
b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
index a1047b6..bb67070 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
@@ -29,6 +29,8 @@ import java.lang.annotation.Target;
  * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
  * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
  * is not desirable, this annotation will tell jvm-dtest to share the class 
accross all class loaders.
+ *
+ * This is the oposite of {@link Isolated}.
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target({ ElementType.TYPE })
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java 
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
new file mode 100644
index 0000000..88987c2
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+
+public final class WithProperties implements AutoCloseable
+{
+    private final List<Property> properties = new ArrayList<>();
+
+    public WithProperties()
+    {
+    }
+
+    public WithProperties(String... kvs)
+    {
+        with(kvs);
+    }
+
+    public void with(String... kvs)
+    {
+        assert kvs.length % 2 == 0 : "Input must have an even amount of inputs 
but given " + kvs.length;
+        for (int i = 0; i <= kvs.length - 2; i = i + 2)
+        {
+            with(kvs[i], kvs[i + 1]);
+        }
+    }
+
+    public void setProperty(String key, String value)
+    {
+        with(key, value);
+    }
+
+    public void set(CassandraRelevantProperties prop, String value)
+    {
+        with(prop.getKey(), value);
+    }
+
+    public void set(CassandraRelevantProperties prop, String... values)
+    {
+        set(prop, Arrays.asList(values));
+    }
+
+    public void set(CassandraRelevantProperties prop, Collection<String> 
values)
+    {
+        set(prop, Joiner.on(",").join(values));
+    }
+
+    public void set(CassandraRelevantProperties prop, boolean value)
+    {
+        set(prop, Boolean.toString(value));
+    }
+
+    public void set(CassandraRelevantProperties prop, long value)
+    {
+        set(prop, Long.toString(value));
+    }
+
+    public void with(String key, String value)
+    {
+        String previous = System.setProperty(key, value);
+        properties.add(new Property(key, previous));
+    }
+
+
+    @Override
+    public void close()
+    {
+        Collections.reverse(properties);
+        properties.forEach(s -> {
+            if (s.value == null)
+                System.getProperties().remove(s.key);
+            else
+                System.setProperty(s.key, s.value);
+        });
+        properties.clear();
+    }
+
+    private static final class Property
+    {
+        private final String key;
+        private final String value;
+
+        private Property(String key, String value)
+        {
+            this.key = key;
+            this.value = value;
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java
new file mode 100644
index 0000000..de6eb0d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.ToolRunner;
+import org.assertj.core.api.Assertions;
+
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getDirectories;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.updateAddress;
+
+public class IPMembershipTest extends TestBaseImpl
+{
+    /**
+     * Port of replace_address_test.py::fail_without_replace_test to jvm-dtest
+     */
+    @Test
+    public void sameIPFailWithoutReplace() throws IOException
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NATIVE_PROTOCOL)
+                                                        
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+                                      .start())
+        {
+            IInvokableInstance nodeToReplace = cluster.get(3);
+
+            ToolRunner.invokeCassandraStress("write", "n=10000", "-schema", 
"replication(factor=3)", "-port", "native=9042").assertOnExitCode();
+
+            for (boolean auto_bootstrap : Arrays.asList(true, false))
+            {
+                stopUnchecked(nodeToReplace);
+                
getDirectories(nodeToReplace).forEach(FileUtils::deleteRecursive);
+
+                nodeToReplace.config().set("auto_bootstrap", auto_bootstrap);
+
+                Assertions.assertThatThrownBy(() -> nodeToReplace.startup())
+                          .hasMessage("A node with address /127.0.0.3:7012 
already exists, cancelling join. Use cassandra.replace_address if you want to 
replace this node.");
+            }
+        }
+    }
+
+    /**
+     * Tests the behavior if a node restarts with a different IP.
+     */
+    @Test
+    public void startupNewIP() throws IOException, InterruptedException
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NATIVE_PROTOCOL)
+                                                        // disable 
DistributedTestSnitch as it tries to query before we setup
+                                                        
.set("endpoint_snitch", "org.apache.cassandra.locator.SimpleSnitch"))
+                                      .start())
+        {
+            IInvokableInstance nodeToReplace = cluster.get(3);
+
+            ToolRunner.invokeCassandraStress("write", "n=10000", "-schema", 
"replication(factor=3)", "-port", "native=9042").assertOnExitCode();
+
+            stopUnchecked(nodeToReplace);
+
+            // change the IP of the node
+            updateAddress(nodeToReplace, "127.0.0.4");
+
+            nodeToReplace.startup();
+
+            // gossip takes some time, wait for the other nodes to see this 
one updated
+            ClusterUtils.awaitRingJoin(cluster.get(1), "127.0.0.4");
+            ClusterUtils.awaitRingJoin(cluster.get(2), "127.0.0.4");
+
+            Set<String> expected = ImmutableSet.of("127.0.0.1", "127.0.0.2", 
"127.0.0.4");
+            cluster.forEach(i -> assertRingIs(i, expected));
+
+            ToolRunner.invokeCassandraStress("read", "n=10000", "no-warmup", 
"-port", "native=9042").assertOnExitCode();
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java 
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index d53cbd4..801df7d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -23,12 +23,14 @@ import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.collect.ImmutableSet;
+
 import org.junit.After;
 import org.junit.BeforeClass;
 
@@ -149,4 +151,17 @@ public class TestBaseImpl extends DistributedTestBase
 
         throw new IllegalArgumentException("Unsupported value type (value is " 
+ value + ')');
     }
+
+    public static void fixDistributedSchemas(Cluster cluster)
+    {
+        // These keyspaces are under replicated by default, so must be updated 
when doing a mulit-node cluster;
+        // else bootstrap will fail with 'Unable to find sufficient sources 
for streaming range <range> in keyspace <name>'
+        for (String ks : Arrays.asList("system_auth", "system_traces"))
+        {
+            cluster.schemaChange("ALTER KEYSPACE " + ks + " WITH REPLICATION = 
{'class': 'SimpleStrategy', 'replication_factor': " + Math.min(cluster.size(), 
3) + "}");
+        }
+
+        // in real live repair is needed in this case, but in the test case it 
doesn't matter if the tables loose
+        // anything, so ignoring repair to speed up the tests.
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
similarity index 53%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to 
test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
index a1047b6..95fa767 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
@@ -16,22 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.distributed.test.hostreplacement;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAbrupt;
 
 /**
- * Tells jvm-dtest that a class should be shared accross all {@link 
ClassLoader}s.
+ * If the operator attempts to assassinate the node before replacing it, this 
will cause the node to fail to start
+ * as the status is non-normal.
  *
- * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
- * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class 
accross all class loaders.
+ * The node is removed abruptly before assassinate, leaving gossip without an 
empty entry.
  */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.TYPE })
-public @interface Shared
+public class AssassinateAbruptDownedNodeTest extends BaseAssassinatedCase
 {
+    @Override
+    void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+    {
+        stopAbrupt(cluster, nodeToRemove);
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
similarity index 53%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to 
test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
index a1047b6..d88f2d9 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
@@ -16,22 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.distributed.test.hostreplacement;
 
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
 
 /**
- * Tells jvm-dtest that a class should be shared accross all {@link 
ClassLoader}s.
+ * If the operator attempts to assassinate the node before replacing it, this 
will cause the node to fail to start
+ * as the status is non-normal.
  *
- * Jvm-dtest relies on classloader isolation to run multiple cassandra 
instances in the same JVM, this makes it
- * so some classes do not get shared (outside a blesssed set of 
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class 
accross all class loaders.
+ * The node is removed gracefully before assassinate, leaving gossip without 
an empty entry.
  */
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.TYPE })
-public @interface Shared
+public class AssassinateGracefullNodeTest extends BaseAssassinatedCase
 {
+    @Override
+    void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+    {
+        stopUnchecked(nodeToRemove);
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
new file mode 100644
index 0000000..31a732f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertGossipInfo;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAll;
+
+/**
+ * If the operator attempts to assassinate the node before replacing it, this 
will cause the node to fail to start
+ * as the status is non-normal.
+ *
+ * The cluster is put into the "empty" state for the node to remove.
+ */
+public class AssassinatedEmptyNodeTest extends BaseAssassinatedCase
+{
+    // empty state does not include the token metadata, so when assassinate 
happens it will fail to find the token
+    @Override
+    protected String expectedMessage(IInvokableInstance nodeToRemove)
+    {
+        return "Could not find tokens for " + 
nodeToRemove.config().broadcastAddress() + " to replace";
+    }
+
+    @Override
+    void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+    {
+        IInvokableInstance seed = cluster.get(SEED_NUM);
+        IInvokableInstance peer = cluster.get(PEER_NUM);
+        InetSocketAddress addressToReplace = nodeToRemove.broadcastAddress();
+
+        // now stop all nodes
+        stopAll(cluster);
+
+        // with all nodes down, now start the seed (should be first node)
+        seed.startup();
+        peer.startup();
+
+        // at this point node2 should be known in gossip, but with 
generation/version of 0
+        assertGossipInfo(seed, addressToReplace, 0, -1);
+        assertGossipInfo(peer, addressToReplace, 0, -1);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
new file mode 100644
index 0000000..4d4768e
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingState;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitGossipStatus;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getBroadcastAddressHostWithPortString;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokens;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public abstract class BaseAssassinatedCase extends TestBaseImpl
+{
+    protected static final int SEED_NUM = 1;
+    protected static final int NODE_TO_REMOVE_NUM = 2;
+    protected static final int PEER_NUM = 3;
+
+    abstract void consume(Cluster cluster, IInvokableInstance nodeToRemove);
+
+    protected String expectedMessage(IInvokableInstance nodeToRemove)
+    {
+        return "Cannot replace token " + getTokens(nodeToRemove).get(0) + " 
which does not exist!";
+    }
+
+    @Test
+    public void test() throws IOException
+    {
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == 4 || node == 5 ? NODE_TO_REMOVE_NUM : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(SEED_NUM);
+            IInvokableInstance nodeToRemove = cluster.get(NODE_TO_REMOVE_NUM);
+            IInvokableInstance peer = cluster.get(PEER_NUM);
+
+            setupCluster(cluster);
+
+            consume(cluster, nodeToRemove);
+
+            assertRingState(seed, nodeToRemove, "Normal");
+
+            // assassinate the node
+            peer.nodetoolResult("assassinate", 
getBroadcastAddressHostWithPortString(nodeToRemove))
+                .asserts().success();
+
+            // wait until the peer sees this assassination
+            awaitGossipStatus(seed, nodeToRemove, "LEFT");
+
+            // allow replacing nodes with the LEFT state, this should fail 
since the token isn't in the ring
+            assertThatThrownBy(() ->
+                               replaceHostAndStart(cluster, nodeToRemove, 
properties -> {
+                                   // since there are downed nodes its 
possible gossip has the downed node with an old schema, so need
+                                   // this property to allow startup
+                                   properties.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, 
true);
+                                   // since the bootstrap should fail because 
the token, don't wait "too long" on schema as it doesn't
+                                   // matter for this test
+                                   properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, 
10);
+                               }))
+            .hasMessage(expectedMessage(nodeToRemove));
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
new file mode 100644
index 0000000..11a30e5
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingState;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAbrupt;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.validateRows;
+
+public class HostReplacementAbruptDownedInstanceTest extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(HostReplacementAbruptDownedInstanceTest.class);
+
+    /**
+     * Can we maybe also test with an abrupt shutdown, that is when the 
shutdown state is not broadcast and the node to be replaced is on NORMAL state?
+     */
+    @Test
+    public void hostReplaceAbruptShutdown() throws IOException
+    {
+        int numStartNodes = 3;
+        TokenSupplier even = 
TokenSupplier.evenlyDistributedTokens(numStartNodes);
+        try (Cluster cluster = Cluster.build(numStartNodes)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == (numStartNodes + 1) ? 2 : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+            IInvokableInstance peer = cluster.get(3);
+            List<IInvokableInstance> peers = Arrays.asList(seed, peer);
+
+            setupCluster(cluster);
+
+            // collect rows/tokens to detect issues later on if the state 
doesn't match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+
+            stopAbrupt(cluster, nodeToRemove);
+
+            // at this point node 2 should still be NORMAL on all other nodes
+            peers.forEach(p -> assertRingState(p, nodeToRemove, "Normal"));
+
+            // node is down, but queries should still work
+            //TODO failing, but shouldn't!
+//            peers.forEach(p -> validateRows(p.coordinator(), expectedState));
+
+            // now create a new node to replace the other node
+            long startNanos = System.nanoTime();
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove, properties -> {
+                // since node2 was killed abruptly its possible that node2's 
gossip state has an old schema version
+                // if this happens then bootstrap will fail waiting for a 
schema version it will never see; to avoid
+                // this, setting this property to log the warning rather than 
fail bootstrap
+                properties.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+            });
+            logger.info("Host replacement of {} with {} took {}", 
nodeToRemove, replacingNode, Duration.ofNanos(System.nanoTime() - startNanos));
+            peers.forEach(p -> awaitRingJoin(p, replacingNode));
+
+            // make sure all nodes are healthy
+            awaitRingHealthy(seed);
+
+            List<IInvokableInstance> expectedRing = Arrays.asList(seed, peer, 
replacingNode);
+            expectedRing.forEach(p -> assertRingIs(p, expectedRing));
+
+            expectedRing.forEach(p -> validateRows(p.coordinator(), 
expectedState));
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
new file mode 100644
index 0000000..477e226
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.assertj.core.api.Assertions;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertGossipInfo;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertNotInRing;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAll;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static 
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.validateRows;
+
+public class HostReplacementOfDownedClusterTest extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(HostReplacementOfDownedClusterTest.class);
+
+    static
+    {
+        // Gossip has a notion of quarantine, which is used to remove "fat 
clients" and "gossip only members"
+        // from the ring if not updated recently (recently is defined by this 
config).
+        // The reason for setting to 0 is to make sure even under such an 
aggressive environment, we do NOT remove
+        // nodes from the peers table
+        GOSSIPER_QUARANTINE_DELAY.setInt(0);
+    }
+
+    /**
+     * When the full cluster crashes, make sure that we can replace a dead 
node after recovery.  This can happen
+     * with DC outages (assuming single DC setup) where the recovery isn't 
able to recover a specific node.
+     */
+    @Test
+    public void hostReplacementOfDeadNode() throws IOException
+    {
+        // start with 2 nodes, stop both nodes, start the seed, host replace 
the down node)
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+        try (Cluster cluster = Cluster.build(2)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == 3 ? 2 : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+            InetSocketAddress addressToReplace = 
nodeToRemove.broadcastAddress();
+
+            setupCluster(cluster);
+
+            // collect rows/tokens to detect issues later on if the state 
doesn't match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+            List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+            // now stop all nodes
+            stopAll(cluster);
+
+            // with all nodes down, now start the seed (should be first node)
+            seed.startup();
+
+            // at this point node2 should be known in gossip, but with 
generation/version of 0
+            assertGossipInfo(seed, addressToReplace, 0, -1);
+
+            // make sure node1 still has node2's tokens
+            List<String> currentTokens = getTokenMetadataTokens(seed);
+            Assertions.assertThat(currentTokens)
+                      .as("Tokens no longer match after restarting")
+                      .isEqualTo(beforeCrashTokens);
+
+            // now create a new node to replace the other node
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove);
+
+            awaitRingJoin(seed, replacingNode);
+            awaitRingJoin(replacingNode, seed);
+            assertNotInRing(seed, nodeToRemove);
+            logger.info("Current ring is {}", assertNotInRing(replacingNode, 
nodeToRemove));
+
+            validateRows(seed.coordinator(), expectedState);
+            validateRows(replacingNode.coordinator(), expectedState);
+        }
+    }
+
+    /**
+     * Cluster stops completely, then start seed, then host replace node2; 
after all complete start node3 to make sure
+     * it comes up correctly with the new host in the ring.
+     */
+    @Test
+    public void hostReplacementOfDeadNodeAndOtherNodeStartsAfter() throws 
IOException
+    {
+        // start with 3 nodes, stop both nodes, start the seed, host replace 
the down node)
+        int numStartNodes = 3;
+        TokenSupplier even = 
TokenSupplier.evenlyDistributedTokens(numStartNodes);
+        try (Cluster cluster = Cluster.build(numStartNodes)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == (numStartNodes + 1) ? 2 : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+            IInvokableInstance nodeToStartAfterReplace = cluster.get(3);
+            InetSocketAddress addressToReplace = 
nodeToRemove.broadcastAddress();
+
+            setupCluster(cluster);
+
+            // collect rows/tokens to detect issues later on if the state 
doesn't match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+            List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+            // now stop all nodes
+            stopAll(cluster);
+
+            // with all nodes down, now start the seed (should be first node)
+            seed.startup();
+
+            // at this point node2 should be known in gossip, but with 
generation/version of 0
+            assertGossipInfo(seed, addressToReplace, 0, -1);
+
+            // make sure node1 still has node2's tokens
+            List<String> currentTokens = getTokenMetadataTokens(seed);
+            Assertions.assertThat(currentTokens)
+                      .as("Tokens no longer match after restarting")
+                      .isEqualTo(beforeCrashTokens);
+
+            // now create a new node to replace the other node
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove);
+
+            // wait till the replacing node is in the ring
+            awaitRingJoin(seed, replacingNode);
+            awaitRingJoin(replacingNode, seed);
+
+            // we see that the replaced node is properly in the ring, now lets 
add the other node back
+            nodeToStartAfterReplace.startup();
+
+            awaitRingJoin(seed, nodeToStartAfterReplace);
+            awaitRingJoin(replacingNode, nodeToStartAfterReplace);
+
+            // make sure all nodes are healthy
+            awaitRingHealthy(seed);
+
+            assertRingIs(seed, seed, replacingNode, nodeToStartAfterReplace);
+            assertRingIs(replacingNode, seed, replacingNode, 
nodeToStartAfterReplace);
+            logger.info("Current ring is {}", 
assertRingIs(nodeToStartAfterReplace, seed, replacingNode, 
nodeToStartAfterReplace));
+
+            validateRows(seed.coordinator(), expectedState);
+            validateRows(replacingNode.coordinator(), expectedState);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
new file mode 100644
index 0000000..3de0bf5
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.assertj.core.api.Assertions;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertInRing;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+
+public class HostReplacementTest extends TestBaseImpl
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(HostReplacementTest.class);
+
+    static
+    {
+        // Gossip has a notiion of quarantine, which is used to remove "fat 
clients" and "gossip only members"
+        // from the ring if not updated recently (recently is defined by this 
config).
+        // The reason for setting to 0 is to make sure even under such an 
aggressive environment, we do NOT remove
+        // nodes from the peers table
+        GOSSIPER_QUARANTINE_DELAY.setInt(0);
+    }
+
+    /**
+     * Attempt to do a host replacement on a down host
+     */
+    @Test
+    public void replaceDownedHost() throws IOException
+    {
+        // start with 2 nodes, stop both nodes, start the seed, host replace 
the down node)
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+        try (Cluster cluster = Cluster.build(2)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == 3 ? 2 : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+
+            setupCluster(cluster);
+
+            // collect rows to detect issues later on if the state doesn't 
match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+
+            stopUnchecked(nodeToRemove);
+
+            // now create a new node to replace the other node
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove, props -> {
+                // since we have a downed host there might be a schema version 
which is old show up but
+                // can't be fetched since the host is down...
+                props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+            });
+
+            // wait till the replacing node is in the ring
+            awaitRingJoin(seed, replacingNode);
+            awaitRingJoin(replacingNode, seed);
+
+            // make sure all nodes are healthy
+            awaitRingHealthy(seed);
+
+            assertRingIs(seed, seed, replacingNode);
+            logger.info("Current ring is {}", assertRingIs(replacingNode, 
seed, replacingNode));
+
+            validateRows(seed.coordinator(), expectedState);
+            validateRows(replacingNode.coordinator(), expectedState);
+        }
+    }
+
+    /**
+     * Attempt to do a host replacement on a alive host
+     */
+    @Test
+    public void replaceAliveHost() throws IOException
+    {
+        // start with 2 nodes, stop both nodes, start the seed, host replace 
the down node)
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+        try (Cluster cluster = Cluster.build(2)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK)
+                                                        
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+                                      .withTokenSupplier(node -> 
even.token(node == 3 ? 2 : node))
+                                      .start())
+        {
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+
+            setupCluster(cluster);
+
+            // collect rows to detect issues later on if the state doesn't 
match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+
+            // now create a new node to replace the other node
+            Assertions.assertThatThrownBy(() -> replaceHostAndStart(cluster, 
nodeToRemove))
+                      .as("Startup of instance should have failed as you can 
not replace a alive node")
+                      .hasMessageContaining("Cannot replace a live node")
+                      .isInstanceOf(UnsupportedOperationException.class);
+
+            // make sure all nodes are healthy
+            awaitRingHealthy(seed);
+
+            assertRingIs(seed, seed, nodeToRemove);
+            logger.info("Current ring is {}", assertRingIs(nodeToRemove, seed, 
nodeToRemove));
+
+            validateRows(seed.coordinator(), expectedState);
+            validateRows(nodeToRemove.coordinator(), expectedState);
+        }
+    }
+
+    /**
+     * If the seed goes down, then another node, once the seed comes back, 
make sure host replacements still work.
+     */
+    @Test
+    public void seedGoesDownBeforeDownHost() throws IOException
+    {
+        // start with 3 nodes, stop both nodes, start the seed, host replace 
the down node)
+        TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NETWORK))
+                                      .withTokenSupplier(node -> 
even.token(node == 4 ? 2 : node))
+                                      .start())
+        {
+            // call early as this can't be touched on a down node
+            IInvokableInstance seed = cluster.get(1);
+            IInvokableInstance nodeToRemove = cluster.get(2);
+            IInvokableInstance nodeToStayAlive = cluster.get(3);
+
+            setupCluster(cluster);
+
+            // collect rows/tokens to detect issues later on if the state 
doesn't match
+            SimpleQueryResult expectedState = 
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE + 
".tbl", ConsistencyLevel.ALL);
+            List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+            // shutdown the seed, then the node to remove
+            stopUnchecked(seed);
+            stopUnchecked(nodeToRemove);
+
+            // restart the seed
+            seed.startup();
+
+            // make sure the node to remove is still in the ring
+            assertInRing(seed, nodeToRemove);
+
+            // make sure node1 still has node2's tokens
+            List<String> currentTokens = getTokenMetadataTokens(seed);
+            Assertions.assertThat(currentTokens)
+                      .as("Tokens no longer match after restarting")
+                      .isEqualTo(beforeCrashTokens);
+
+            // now create a new node to replace the other node
+            IInvokableInstance replacingNode = replaceHostAndStart(cluster, 
nodeToRemove);
+
+            List<IInvokableInstance> expectedRing = Arrays.asList(seed, 
replacingNode, nodeToStayAlive);
+
+            // wait till the replacing node is in the ring
+            awaitRingJoin(seed, replacingNode);
+            awaitRingJoin(replacingNode, seed);
+            awaitRingJoin(nodeToStayAlive, replacingNode);
+
+            // make sure all nodes are healthy
+            logger.info("Current ring is {}", awaitRingHealthy(seed));
+
+            expectedRing.forEach(i -> assertRingIs(i, expectedRing));
+
+            validateRows(seed.coordinator(), expectedState);
+            validateRows(replacingNode.coordinator(), expectedState);
+        }
+    }
+
+    static void setupCluster(Cluster cluster)
+    {
+        fixDistributedSchemas(cluster);
+        init(cluster);
+
+        populate(cluster);
+        cluster.forEach(i -> i.flush(KEYSPACE));
+    }
+
+    static void populate(Cluster cluster)
+    {
+        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl 
(pk int PRIMARY KEY)");
+        for (int i = 0; i < 10; i++)
+        {
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk) VALUES (?)",
+                                           ConsistencyLevel.ALL,
+                                           i);
+        }
+    }
+
+    static void validateRows(ICoordinator coordinator, SimpleQueryResult 
expected)
+    {
+        expected.reset();
+        SimpleQueryResult rows = coordinator.executeWithResult("SELECT * FROM 
" + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+        AssertUtils.assertRows(rows, expected);
+    }
+}
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java 
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 367f6a1..1b17a27 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -83,19 +83,19 @@ public class GossiperTest
     public void testHaveVersion3Nodes() throws Exception
     {
         VersionedValue.VersionedValueFactory factory = new 
VersionedValue.VersionedValueFactory(null);
-        EndpointState es = new EndpointState(null);
+        EndpointState es = new EndpointState((HeartBeatState) null);
         es.addApplicationState(ApplicationState.RELEASE_VERSION, 
factory.releaseVersion("4.0-SNAPSHOT"));
         
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"),
 es);
         
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
 
 
-        es = new EndpointState(null);
+        es = new EndpointState((HeartBeatState) null);
         es.addApplicationState(ApplicationState.RELEASE_VERSION, 
factory.releaseVersion("3.11.3"));
         
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"),
 es);
         
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
 
 
-        es = new EndpointState(null);
+        es = new EndpointState((HeartBeatState) null);
         es.addApplicationState(ApplicationState.RELEASE_VERSION, 
factory.releaseVersion("3.0.0"));
         
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"),
 es);
         
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to