This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 41952a2  Improve machinery for testing bootstrap and range movements
41952a2 is described below

commit 41952a2f73ba5198250f64beba8f7ff1203204ab
Author: Alex Petrov <[email protected]>
AuthorDate: Tue Jul 7 10:48:33 2020 +0200

    Improve machinery for testing bootstrap and range movements
    
    Patch by Alex Petrov; reviewed by Aleksey Yeschenko and David Capwell for 
CASSANDRA-15935
---
 .../org/apache/cassandra/dht/BootStrapper.java     |   6 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  25 +-
 .../org/apache/cassandra/gms/VersionedValue.java   |  11 +
 .../org/apache/cassandra/net/MessagingService.java |   6 +
 .../apache/cassandra/schema/MigrationManager.java  |  13 +-
 .../apache/cassandra/service/StorageService.java   | 236 ++++++-----
 .../cassandra/distributed/UpgradeableCluster.java  |   1 -
 .../cassandra/distributed/action/GossipHelper.java | 457 +++++++++++++++++++++
 .../distributed/action/InstanceAction.java         |  27 ++
 .../distributed/impl/AbstractCluster.java          |  45 +-
 .../distributed/impl/DistributedTestSnitch.java    |   4 +-
 .../cassandra/distributed/impl/Instance.java       |  90 +---
 .../distributed/impl/IsolatedExecutor.java         |   2 +-
 .../shared/VersionedApplicationState.java          |  35 ++
 .../cassandra/distributed/test/BootstrapTest.java  | 102 -----
 .../cassandra/distributed/test/TestBaseImpl.java   |   3 +-
 .../distributed/test/ring/BootstrapTest.java       | 130 ++++++
 .../ring/CommunicationDuringDecommissionTest.java  |  76 ++++
 .../distributed/test/ring/NodeNotInRingTest.java   |  75 ++++
 .../distributed/test/ring/PendingWritesTest.java   | 109 +++++
 test/unit/org/apache/cassandra/Util.java           |   3 +-
 21 files changed, 1154 insertions(+), 302 deletions(-)

diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java 
b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 94bf283..bc64325 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -152,7 +152,7 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
      * otherwise, if allocationKeyspace is specified use the token allocation 
algorithm to generate suitable tokens
      * else choose num_tokens tokens at random
      */
-    public static Collection<Token> getBootstrapTokens(final TokenMetadata 
metadata, InetAddressAndPort address, int schemaWaitDelay) throws 
ConfigurationException
+    public static Collection<Token> getBootstrapTokens(final TokenMetadata 
metadata, InetAddressAndPort address, long schemaWaitDelay) throws 
ConfigurationException
     {
         String allocationKeyspace = 
DatabaseDescriptor.getAllocateTokensForKeyspace();
         Integer allocationLocalRf = 
DatabaseDescriptor.getAllocateTokensForLocalRf();
@@ -205,7 +205,7 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
                                             InetAddressAndPort address,
                                             String allocationKeyspace,
                                             int numTokens,
-                                            int schemaWaitDelay)
+                                            long schemaWaitDelay)
     {
         StorageService.instance.waitForSchema(schemaWaitDelay);
         if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
@@ -226,7 +226,7 @@ public class BootStrapper extends 
ProgressEventNotifierSupport
                                             InetAddressAndPort address,
                                             int rf,
                                             int numTokens,
-                                            int schemaWaitDelay)
+                                            long schemaWaitDelay)
     {
         StorageService.instance.waitForSchema(schemaWaitDelay);
         if 
(!FBUtilities.getBroadcastAddressAndPort().equals(InetAddressAndPort.getLoopbackAddress()))
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 316a3cd..51e7e54 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -708,6 +708,16 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         assassinateEndpoint(address);
     }
 
+    @VisibleForTesting
+    public void unsafeAnulEndpoint(InetAddressAndPort endpoint)
+    {
+        removeEndpoint(endpoint);
+        justRemovedEndpoints.remove(endpoint);
+        endpointStateMap.remove(endpoint);
+        expireTimeEndpointMap.remove(endpoint);
+        unreachableEndpoints.remove(endpoint);
+    }
+
     /**
      * Do not call this method unless you know what you are doing.
      * It will try extremely hard to obliterate any endpoint from the ring,
@@ -1312,7 +1322,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return pieces[0];
     }
 
-    void applyStateLocally(Map<InetAddressAndPort, EndpointState> epStateMap)
+    @VisibleForTesting
+    public void applyStateLocally(Map<InetAddressAndPort, EndpointState> 
epStateMap)
     {
         checkProperThreadForStateMutation();
         for (Entry<InetAddressAndPort, EndpointState> entry : 
epStateMap.entrySet())
@@ -1785,9 +1796,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     private void addLocalApplicationStateInternal(ApplicationState state, 
VersionedValue value)
     {
         assert taskLock.isHeldByCurrentThread();
-        EndpointState epState = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
         InetAddressAndPort epAddr = FBUtilities.getBroadcastAddressAndPort();
-        assert epState != null;
+        EndpointState epState = endpointStateMap.get(epAddr);
+        assert epState != null : "Can't find endpoint state for " + epAddr;
         // Fire "before change" notifications:
         doBeforeChangeNotifications(epAddr, epState, state, value);
         // Notifications may have taken some time, so preventively raise the 
version
@@ -1887,6 +1898,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     @VisibleForTesting
     public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int 
generationNbr)
     {
+        initializeNodeUnsafe(addr, uuid, MessagingService.current_version, 
generationNbr);
+    }
+
+    @VisibleForTesting
+    public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int 
netVersion, int generationNbr)
+    {
         HeartBeatState hbState = new HeartBeatState(generationNbr);
         EndpointState newState = new EndpointState(hbState);
         newState.markAlive();
@@ -1895,7 +1912,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
         // always add the version state
         Map<ApplicationState, VersionedValue> states = new 
EnumMap<>(ApplicationState.class);
-        states.put(ApplicationState.NET_VERSION, 
StorageService.instance.valueFactory.networkVersion());
+        states.put(ApplicationState.NET_VERSION, 
StorageService.instance.valueFactory.networkVersion(netVersion));
         states.put(ApplicationState.HOST_ID, 
StorageService.instance.valueFactory.hostId(uuid));
         localState.addApplicationStates(states);
     }
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index caf6ce9..3dc4c57 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -100,6 +100,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         this(value, VersionGenerator.getNextVersion());
     }
 
+    public static VersionedValue unsafeMakeVersionedValue(String value, int 
version)
+    {
+        return new VersionedValue(value, version);
+    }
+
     public int compareTo(VersionedValue value)
     {
         return this.version - value.version;
@@ -272,6 +277,12 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             return new VersionedValue(version);
         }
 
+        @VisibleForTesting
+        public VersionedValue networkVersion(int version)
+        {
+            return new VersionedValue(String.valueOf(version));
+        }
+
         public VersionedValue networkVersion()
         {
             return new 
VersionedValue(String.valueOf(MessagingService.current_version));
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index c0f57f8..4d712e8 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -431,6 +431,12 @@ public final class MessagingService extends 
MessagingServiceMBeanImpl
 
     public void shutdown(long timeout, TimeUnit units, boolean 
shutdownGracefully, boolean shutdownExecutors)
     {
+        if (isShuttingDown)
+        {
+            logger.info("Shutdown was already called");
+            return;
+        }
+
         isShuttingDown = true;
         logger.info("Waiting for messaging service to quiesce");
         // We may need to schedule hints on the mutation stage, so it's 
erroneous to shut down the mutation stage first
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java 
b/src/java/org/apache/cassandra/schema/MigrationManager.java
index 4f91d94..8f627cd 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -21,8 +21,9 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.lang.management.ManagementFactory;
-import java.lang.management.RuntimeMXBean;
+import java.util.function.LongSupplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +51,13 @@ public class MigrationManager
 
     public static final MigrationManager instance = new MigrationManager();
 
-    private static final RuntimeMXBean runtimeMXBean = 
ManagementFactory.getRuntimeMXBean();
+    private static LongSupplier getUptimeFn = () -> 
ManagementFactory.getRuntimeMXBean().getUptime();
+
+    @VisibleForTesting
+    public static void setUptimeFn(LongSupplier supplier)
+    {
+        getUptimeFn = supplier;
+    }
 
     private static final int MIGRATION_DELAY_IN_MS = 60000;
 
@@ -100,7 +107,7 @@ public class MigrationManager
             return;
         }
 
-        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < 
MIGRATION_DELAY_IN_MS)
+        if (Schema.instance.isEmpty() || getUptimeFn.getAsLong() < 
MIGRATION_DELAY_IN_MS)
         {
             // If we think we may be bootstrapping or have recently started, 
submit MigrationTask immediately
             logger.debug("Immediately submitting migration task for {}, " +
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 746d599..4140680 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -135,6 +135,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 {
     private static final Logger logger = 
LoggerFactory.getLogger(StorageService.class);
 
+    public static final int INDEFINITE = -1;
     public static final int RING_DELAY = getRingDelay(); // delay after which 
we assume ring has stablized
 
     private final JMXProgressSupport progressSupport = new 
JMXProgressSupport(this);
@@ -148,7 +149,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             return Integer.parseInt(newdelay);
         }
         else
+        {
             return 30 * 1000;
+        }
     }
 
     /* This abstraction maintains the token/endpoint metadata information */
@@ -258,7 +261,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return joined;
     }
 
-    /** This method updates the local token on disk  */
+    /**
+     * This method updates the local token on disk
+     */
     public void setTokens(Collection<Token> tokens)
     {
         assert tokens != null && !tokens.isEmpty() : "Node needs at least one 
token.";
@@ -850,10 +855,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
-    public void waitForSchema(int delay)
+    public void waitForSchema(long delay)
     {
         // first sleep the delay to make sure we see all our peers
-        for (int i = 0; i < delay; i += 1000)
+        for (long i = 0; i < delay; i += 1000)
         {
             // if we see schema, we can proceed to the next check directly
             if (!Schema.instance.isEmpty())
@@ -873,7 +878,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
-    private void joinTokenRing(int delay) throws ConfigurationException
+    private void joinTokenRing(long schemaTimeoutMillis) throws 
ConfigurationException
+    {
+        joinTokenRing(!isSurveyMode, shouldBootstrap(), schemaTimeoutMillis, 
INDEFINITE);
+    }
+
+    @VisibleForTesting
+    public void joinTokenRing(boolean finishJoiningRing,
+                              boolean shouldBootstrap,
+                              long schemaTimeoutMillis,
+                              long bootstrapTimeoutMillis) throws 
ConfigurationException
     {
         joined = true;
 
@@ -901,101 +915,18 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
 
         boolean dataAvailable = true; // make this to false when bootstrap 
streaming failed
-        boolean bootstrap = shouldBootstrap();
-        if (bootstrap)
-        {
-            if (SystemKeyspace.bootstrapInProgress())
-                logger.warn("Detected previous bootstrap failure; retrying");
-            else
-                
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
-            setMode(Mode.JOINING, "waiting for ring information", true);
-            waitForSchema(delay);
-            setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
-            setMode(Mode.JOINING, "waiting for pending range calculation", 
true);
-            PendingRangeCalculatorService.instance.blockUntilFinished();
-            setMode(Mode.JOINING, "calculation complete, ready to bootstrap", 
true);
-
-            logger.debug("... got ring + schema info");
-
-            if (useStrictConsistency && !allowSimultaneousMoves() &&
-                    (
-                        tokenMetadata.getBootstrapTokens().valueSet().size() > 
0 ||
-                        tokenMetadata.getSizeOfLeavingEndpoints() > 0 ||
-                        tokenMetadata.getSizeOfMovingEndpoints() > 0
-                    ))
-            {
-                String bootstrapTokens = 
StringUtils.join(tokenMetadata.getBootstrapTokens().valueSet(), ',');
-                String leavingTokens = 
StringUtils.join(tokenMetadata.getLeavingEndpoints(), ',');
-                String movingTokens = 
StringUtils.join(tokenMetadata.getMovingEndpoints().stream().map(e -> 
e.right).toArray(), ',');
-                throw new UnsupportedOperationException(String.format("Other 
bootstrapping/leaving/moving nodes detected, cannot bootstrap while 
cassandra.consistent.rangemovement is true. Nodes detected, bootstrapping: %s; 
leaving: %s; moving: %s;", bootstrapTokens, leavingTokens, movingTokens));
-            }
-
-            // get bootstrap tokens
-            if (!replacing)
-            {
-                if 
(tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
-                {
-                    String s = "This node is already a member of the token 
ring; bootstrap aborted. (If replacing a dead node, remove the old one from the 
ring first.)";
-                    throw new UnsupportedOperationException(s);
-                }
-                setMode(Mode.JOINING, "getting bootstrap token", true);
-                bootstrapTokens = 
BootStrapper.getBootstrapTokens(tokenMetadata, 
FBUtilities.getBroadcastAddressAndPort(), delay);
-            }
-            else
-            {
-                if (!isReplacingSameAddress())
-                {
-                    try
-                    {
-                        // Sleep additionally to make sure that the server 
actually is not alive
-                        // and giving it more time to gossip if alive.
-                        Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
-
-                    // check for operator errors...
-                    for (Token token : bootstrapTokens)
-                    {
-                        InetAddressAndPort existing = 
tokenMetadata.getEndpoint(token);
-                        if (existing != null)
-                        {
-                            long nanoDelay = delay * 1000000L;
-                            if 
(Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > 
(System.nanoTime() - nanoDelay))
-                                throw new 
UnsupportedOperationException("Cannot replace a live node... ");
-                            current.add(existing);
-                        }
-                        else
-                        {
-                            throw new UnsupportedOperationException("Cannot 
replace token " + token + " which does not exist!");
-                        }
-                    }
-                }
-                else
-                {
-                    try
-                    {
-                        Thread.sleep(RING_DELAY);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
 
-                }
-                setMode(Mode.JOINING, "Replacing a node with token(s): " + 
bootstrapTokens, true);
-            }
-
-            dataAvailable = bootstrap(bootstrapTokens);
+        if (shouldBootstrap)
+        {
+            current.addAll(prepareForBootstrap(schemaTimeoutMillis));
+            dataAvailable = bootstrap(bootstrapTokens, bootstrapTimeoutMillis);
         }
         else
         {
             bootstrapTokens = SystemKeyspace.getSavedTokens();
             if (bootstrapTokens.isEmpty())
             {
-                bootstrapTokens = 
BootStrapper.getBootstrapTokens(tokenMetadata, 
FBUtilities.getBroadcastAddressAndPort(), delay);
+                bootstrapTokens = 
BootStrapper.getBootstrapTokens(tokenMetadata, 
FBUtilities.getBroadcastAddressAndPort(), schemaTimeoutMillis);
             }
             else
             {
@@ -1008,11 +939,11 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         setUpDistributedSystemKeyspaces();
 
-        if (!isSurveyMode)
+        if (finishJoiningRing)
         {
             if (dataAvailable)
             {
-                finishJoiningRing(bootstrap, bootstrapTokens);
+                finishJoiningRing(shouldBootstrap, bootstrapTokens);
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
@@ -1107,7 +1038,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 .forEach(cfs -> 
cfs.indexManager.executePreJoinTasksBlocking(bootstrap));
     }
 
-    private void finishJoiningRing(boolean didBootstrap, Collection<Token> 
tokens)
+    @VisibleForTesting
+    public void finishJoiningRing(boolean didBootstrap, Collection<Token> 
tokens)
     {
         // start participating in the ring.
         setMode(Mode.JOINING, "Finish joining ring", true);
@@ -1140,7 +1072,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return authSetupComplete;
     }
 
-    private void setUpDistributedSystemKeyspaces()
+    @VisibleForTesting
+    public void setUpDistributedSystemKeyspaces()
     {
         Collection<Mutation> changes = new ArrayList<>(3);
 
@@ -1529,6 +1462,96 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             logger.debug(logMsg);
     }
 
+    @VisibleForTesting
+    public Collection<InetAddressAndPort> prepareForBootstrap(long schemaDelay)
+    {
+        Set<InetAddressAndPort> collisions = new HashSet<>();
+        if (SystemKeyspace.bootstrapInProgress())
+            logger.warn("Detected previous bootstrap failure; retrying");
+        else
+            
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.IN_PROGRESS);
+        setMode(Mode.JOINING, "waiting for ring information", true);
+        waitForSchema(schemaDelay);
+        setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
+        setMode(Mode.JOINING, "waiting for pending range calculation", true);
+        PendingRangeCalculatorService.instance.blockUntilFinished();
+        setMode(Mode.JOINING, "calculation complete, ready to bootstrap", 
true);
+
+        logger.debug("... got ring + schema info");
+
+        if (useStrictConsistency && !allowSimultaneousMoves() &&
+            (
+            tokenMetadata.getBootstrapTokens().valueSet().size() > 0 ||
+            tokenMetadata.getSizeOfLeavingEndpoints() > 0 ||
+            tokenMetadata.getSizeOfMovingEndpoints() > 0
+            ))
+        {
+            String bootstrapTokens = 
StringUtils.join(tokenMetadata.getBootstrapTokens().valueSet(), ',');
+            String leavingTokens = 
StringUtils.join(tokenMetadata.getLeavingEndpoints(), ',');
+            String movingTokens = 
StringUtils.join(tokenMetadata.getMovingEndpoints().stream().map(e -> 
e.right).toArray(), ',');
+            throw new UnsupportedOperationException(String.format("Other 
bootstrapping/leaving/moving nodes detected, cannot bootstrap while 
cassandra.consistent.rangemovement is true. Nodes detected, bootstrapping: %s; 
leaving: %s; moving: %s;", bootstrapTokens, leavingTokens, movingTokens));
+        }
+
+        // get bootstrap tokens
+        if (!replacing)
+        {
+            if 
(tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
+            {
+                String s = "This node is already a member of the token ring; 
bootstrap aborted. (If replacing a dead node, remove the old one from the ring 
first.)";
+                throw new UnsupportedOperationException(s);
+            }
+            setMode(Mode.JOINING, "getting bootstrap token", true);
+            bootstrapTokens = BootStrapper.getBootstrapTokens(tokenMetadata, 
FBUtilities.getBroadcastAddressAndPort(), schemaDelay);
+        }
+        else
+        {
+            if (!isReplacingSameAddress())
+            {
+                try
+                {
+                    // Sleep additionally to make sure that the server 
actually is not alive
+                    // and giving it more time to gossip if alive.
+                    Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+                // check for operator errors...
+                for (Token token : bootstrapTokens)
+                {
+                    InetAddressAndPort existing = 
tokenMetadata.getEndpoint(token);
+                    if (existing != null)
+                    {
+                        long nanoDelay = schemaDelay * 1000000L;
+                        if 
(Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > 
(System.nanoTime() - nanoDelay))
+                            throw new UnsupportedOperationException("Cannot 
replace a live node... ");
+                        collisions.add(existing);
+                    }
+                    else
+                    {
+                        throw new UnsupportedOperationException("Cannot 
replace token " + token + " which does not exist!");
+                    }
+                }
+            }
+            else
+            {
+                try
+                {
+                    Thread.sleep(RING_DELAY);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+
+            }
+            setMode(Mode.JOINING, "Replacing a node with token(s): " + 
bootstrapTokens, true);
+        }
+        return collisions;
+    }
+
     /**
      * Bootstrap node by fetching data from other nodes.
      * If node is bootstrapping as a new node, then this also announces 
bootstrapping to the cluster.
@@ -1538,7 +1561,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * @param tokens bootstrapping tokens
      * @return true if bootstrap succeeds.
      */
-    private boolean bootstrap(final Collection<Token> tokens)
+    @VisibleForTesting
+    public boolean bootstrap(final Collection<Token> tokens, long 
bootstrapTimeoutMillis)
     {
         isBootstrapMode = true;
         SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes 
us part of the ring locally which is incorrect until we are done bootstrapping
@@ -1576,13 +1600,13 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         // Force disk boundary invalidation now that local tokens are set
         invalidateDiskBoundaries();
 
-        setMode(Mode.JOINING, "Starting to bootstrap...", true);
-        BootStrapper bootstrapper = new 
BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
-        bootstrapper.addProgressListener(progressSupport);
-        ListenableFuture<StreamState> bootstrapStream = 
bootstrapper.bootstrap(streamStateStore, useStrictConsistency && !replacing); 
// handles token update
+        Future<StreamState> bootstrapStream = startBootstrap(tokens);
         try
         {
-            bootstrapStream.get();
+            if (bootstrapTimeoutMillis > 0)
+                bootstrapStream.get(bootstrapTimeoutMillis, MILLISECONDS);
+            else
+                bootstrapStream.get();
             bootstrapFinished();
             logger.info("Bootstrap completed for tokens {}", tokens);
             return true;
@@ -1594,6 +1618,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
     }
 
+    public Future<StreamState> startBootstrap(Collection<Token> tokens)
+    {
+        setMode(Mode.JOINING, "Starting to bootstrap...", true);
+        BootStrapper bootstrapper = new 
BootStrapper(FBUtilities.getBroadcastAddressAndPort(), tokens, tokenMetadata);
+        bootstrapper.addProgressListener(progressSupport);
+        return bootstrapper.bootstrap(streamStateStore, useStrictConsistency 
&& !replacing); // handles token update
+    }
+
     private void invalidateDiskBoundaries()
     {
         for (Keyspace keyspace : Keyspace.all())
diff --git 
a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java 
b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 532c1b1..47fd15b 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -24,7 +24,6 @@ import java.util.function.Consumer;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
-import org.apache.cassandra.distributed.shared.AbstractBuilder;
 import org.apache.cassandra.distributed.shared.Versions;
 
 /**
diff --git 
a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java 
b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
new file mode 100644
index 0000000..70ef503
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.action;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.shared.VersionedApplicationState;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.MigrationManager;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
+
+public class GossipHelper
+{
+    public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
+    {
+        return (instance) ->
+        {
+            changeGossipState(instance,
+                              newNode,
+                              Arrays.asList(tokens(newNode),
+                                            statusBootstrapping(newNode),
+                                            
statusWithPortBootstrapping(newNode)));
+        };
+    }
+
+    public static InstanceAction statusToNormal(IInvokableInstance peer)
+    {
+        return (target) ->
+        {
+            changeGossipState(target,
+                              peer,
+                              Arrays.asList(tokens(peer),
+                                            statusNormal(peer),
+                                            releaseVersion(peer),
+                                            netVersion(peer),
+                                            statusWithPortNormal(peer)));
+        };
+    }
+
+    /**
+     * This method is unsafe and should be used _only_ when gossip is not used 
or available: it creates versioned values on the
+     * target instance, which means Gossip versioning gets out of sync. Use a 
safe couterpart at all times when performing _any_
+     * ring movement operations _or_ if Gossip is used.
+     */
+    public static void unsafeStatusToNormal(IInvokableInstance target, 
IInstance peer)
+    {
+        int messagingVersion = peer.getMessagingVersion();
+        changeGossipState(target,
+                          peer,
+                          Arrays.asList(unsafeVersionedValue(target,
+                                                             
ApplicationState.TOKENS,
+                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).tokens(tokens),
+                                                             
peer.config().getString("partitioner"),
+                                                             
peer.config().getString("initial_token")),
+                                        unsafeVersionedValue(target,
+                                                             
ApplicationState.STATUS,
+                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
+                                                             
peer.config().getString("partitioner"),
+                                                             
peer.config().getString("initial_token")),
+                                        unsafeVersionedValue(target,
+                                                             
ApplicationState.STATUS_WITH_PORT,
+                                                             (partitioner, 
tokens) -> new VersionedValue.VersionedValueFactory(partitioner).normal(tokens),
+                                                             
peer.config().getString("partitioner"),
+                                                             
peer.config().getString("initial_token")),
+                                        unsafeVersionedValue(target,
+                                                             
ApplicationState.NET_VERSION,
+                                                             (partitioner) -> 
new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion(messagingVersion),
+                                                             
peer.config().getString("partitioner")),
+                                        unsafeReleaseVersion(target,
+                                                             
peer.config().getString("partitioner"),
+                                                             
peer.getReleaseVersionString())));
+    }
+
+    public static InstanceAction statusToLeaving(IInvokableInstance newNode)
+    {
+        return (instance) -> {
+            changeGossipState(instance,
+                              newNode,
+                              Arrays.asList(tokens(newNode),
+                                            statusLeaving(newNode),
+                                            statusWithPortLeaving(newNode)));
+        };
+    }
+
+    public static InstanceAction bootstrap()
+    {
+        return new BootstrapAction();
+    }
+
+    public static InstanceAction bootstrap(boolean joinRing, Duration 
waitForBootstrap, Duration waitForSchema)
+    {
+        return new BootstrapAction(joinRing, waitForBootstrap, waitForSchema);
+    }
+
+    public static InstanceAction disseminateGossipState(IInvokableInstance 
newNode)
+    {
+        return new DisseminateGossipState(newNode);
+    }
+
+    public static InstanceAction pullSchemaFrom(IInvokableInstance pullFrom)
+    {
+        return new PullSchemaFrom(pullFrom);
+    }
+
+    private static InstanceAction disableBinary()
+    {
+        return (instance) -> 
instance.nodetoolResult("disablebinary").asserts().success();
+    }
+
+    private static class DisseminateGossipState implements InstanceAction
+    {
+        final Map<InetSocketAddress, byte[]> gossipState;
+
+        public DisseminateGossipState(IInvokableInstance... from)
+        {
+            gossipState = new HashMap<>();
+            for (IInvokableInstance node : from)
+            {
+                byte[] epBytes = node.callsOnInstance(() -> {
+                    EndpointState epState = 
Gossiper.instance.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddressAndPort());
+                    return toBytes(epState);
+                }).call();
+                gossipState.put(node.broadcastAddress(), epBytes);
+            }
+        }
+
+        public void accept(IInvokableInstance instance)
+        {
+            
instance.appliesOnInstance((IIsolatedExecutor.SerializableFunction<Map<InetSocketAddress,
 byte[]>, Void>)
+                                       (map) -> {
+                                           Map<InetAddressAndPort, 
EndpointState> newState = new HashMap<>();
+                                           for (Map.Entry<InetSocketAddress, 
byte[]> e : map.entrySet())
+                                               
newState.put(toCassandraInetAddressAndPort(e.getKey()), 
fromBytes(e.getValue()));
+
+                                           
Gossiper.runInGossipStageBlocking(() -> {
+                                               
Gossiper.instance.applyStateLocally(newState);
+                                           });
+                                           return null;
+                                       }).apply(gossipState);
+        }
+    }
+
+    private static byte[] toBytes(EndpointState epState)
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer(1024))
+        {
+            EndpointState.serializer.serialize(epState, out, 
MessagingService.current_version);
+            return out.toByteArray();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static EndpointState fromBytes(byte[] bytes)
+    {
+        try (DataInputBuffer in = new DataInputBuffer(bytes))
+        {
+            return EndpointState.serializer.deserialize(in, 
MessagingService.current_version);
+        }
+        catch (Throwable t)
+        {
+            throw new RuntimeException(t);
+        }
+    }
+
+    private static class PullSchemaFrom implements InstanceAction
+    {
+        final InetSocketAddress pullFrom;
+
+        public PullSchemaFrom(IInvokableInstance pullFrom)
+        {
+            this.pullFrom = pullFrom.broadcastAddress();;
+        }
+
+        public void accept(IInvokableInstance pullTo)
+        {
+            pullTo.acceptsOnInstance((InetSocketAddress pullFrom) -> {
+                InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(pullFrom);
+                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+                MigrationManager.scheduleSchemaPull(endpoint, state);
+                MigrationManager.waitUntilReadyForBootstrap();
+            }).accept(pullFrom);
+        }
+    }
+
+    private static class BootstrapAction implements InstanceAction, 
Serializable
+    {
+        private final boolean joinRing;
+        private final Duration waitForBootstrap;
+        private final Duration waitForSchema;
+
+        public BootstrapAction()
+        {
+            this(true, Duration.ofMinutes(10), Duration.ofSeconds(10));
+        }
+
+        public BootstrapAction(boolean joinRing, Duration waitForBootstrap, 
Duration waitForSchema)
+        {
+            this.joinRing = joinRing;
+            this.waitForBootstrap = waitForBootstrap;
+            this.waitForSchema = waitForSchema;
+        }
+
+        public void accept(IInvokableInstance instance)
+        {
+            instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
+                IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
+                List<Token> tokens = 
Collections.singletonList(partitioner.getTokenFactory().fromString(tokenString));
+                try
+                {
+                    Collection<InetAddressAndPort> collisions = 
StorageService.instance.prepareForBootstrap(waitForSchema.toMillis());
+                    assert collisions.size() == 0 : String.format("Didn't 
expect any replacements but got %s", collisions);
+                    boolean isBootstrapSuccessful = 
StorageService.instance.bootstrap(tokens, waitForBootstrap.toMillis());
+                    assert isBootstrapSuccessful : "Bootstrap did not complete 
successfully";
+                    StorageService.instance.setUpDistributedSystemKeyspaces();
+                    if (joinRing)
+                        StorageService.instance.finishJoiningRing(true, 
tokens);
+                }
+                catch (Throwable t)
+                {
+                    throw new RuntimeException(t);
+                }
+
+                return null;
+            }).apply(instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
+        }
+    }
+
+    public static InstanceAction decomission()
+    {
+        return (target) -> 
target.nodetoolResult("decommission").asserts().success();
+    }
+
+
+    public static VersionedApplicationState tokens(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.TOKENS, (partitioner, 
tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).tokens(tokens));
+    }
+
+    public static VersionedApplicationState netVersion(IInvokableInstance 
instance)
+    {
+        return versionedToken(instance, ApplicationState.NET_VERSION, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).networkVersion());
+    }
+
+    private static VersionedApplicationState 
unsafeReleaseVersion(IInvokableInstance instance, String partitionerStr, String 
releaseVersionStr)
+    {
+        return unsafeVersionedValue(instance, 
ApplicationState.RELEASE_VERSION, (partitioner) -> new 
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersionStr),
 partitionerStr);
+    }
+
+    public static VersionedApplicationState releaseVersion(IInvokableInstance 
instance)
+    {
+        return unsafeReleaseVersion(instance, 
instance.config().getString("partitioner"), instance.getReleaseVersionString());
+    }
+
+    public static VersionedApplicationState statusNormal(IInvokableInstance 
instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS, (partitioner, 
tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
+    }
+
+    public static VersionedApplicationState 
statusWithPortNormal(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).normal(tokens));
+    }
+
+    public static VersionedApplicationState 
statusBootstrapping(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS, (partitioner, 
tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
+    }
+
+    public static VersionedApplicationState 
statusWithPortBootstrapping(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
+    }
+
+    public static VersionedApplicationState statusLeaving(IInvokableInstance 
instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS, (partitioner, 
tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
+    }
+
+    public static VersionedApplicationState statusLeft(IInvokableInstance 
instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS, (partitioner, 
tokens) -> {
+            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
+        });
+    }
+
+    public static VersionedApplicationState 
statusWithPortLeft(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> {
+            return new 
VersionedValue.VersionedValueFactory(partitioner).left(tokens, 
System.currentTimeMillis() + Gossiper.aVeryLongTime);
+
+        });
+    }
+
+    public static VersionedApplicationState 
statusWithPortLeaving(IInvokableInstance instance)
+    {
+        return versionedToken(instance, ApplicationState.STATUS_WITH_PORT, 
(partitioner, tokens) -> new 
VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
+    }
+
+    public static VersionedValue toVersionedValue(VersionedApplicationState vv)
+    {
+        return VersionedValue.unsafeMakeVersionedValue(vv.value, vv.version);
+    }
+
+    public static ApplicationState 
toApplicationState(VersionedApplicationState vv)
+    {
+        return ApplicationState.values()[vv.applicationState];
+    }
+
+    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
+                                                                 
ApplicationState applicationState,
+                                                                 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier,
+                                                                 String 
partitionerStr, String initialTokenStr)
+    {
+        return instance.appliesOnInstance((String partitionerString, String 
tokenString) -> {
+            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
+            Token token = 
partitioner.getTokenFactory().fromString(tokenString);
+
+            VersionedValue versionedValue = supplier.apply(partitioner, 
Collections.singleton(token));
+            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
+        }).apply(partitionerStr, initialTokenStr);
+    }
+
+    private static VersionedApplicationState 
unsafeVersionedValue(IInvokableInstance instance,
+                                                                 
ApplicationState applicationState,
+                                                                 
IIsolatedExecutor.SerializableFunction<IPartitioner, VersionedValue> supplier,
+                                                                 String 
partitionerStr)
+    {
+        return instance.appliesOnInstance((String partitionerString) -> {
+            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerString);
+            VersionedValue versionedValue = supplier.apply(partitioner);
+            return new VersionedApplicationState(applicationState.ordinal(), 
versionedValue.value, versionedValue.version);
+        }).apply(partitionerStr);
+    }
+
+    public static VersionedApplicationState versionedToken(IInvokableInstance 
instance, ApplicationState applicationState, 
IIsolatedExecutor.SerializableBiFunction<IPartitioner, Collection<Token>, 
VersionedValue> supplier)
+    {
+        return unsafeVersionedValue(instance, applicationState, supplier, 
instance.config().getString("partitioner"), 
instance.config().getString("initial_token"));
+    }
+
+    public static InstanceAction removeFromRing(IInvokableInstance peer)
+    {
+        return (target) -> {
+            InetAddressAndPort endpoint = 
toCassandraInetAddressAndPort(peer.broadcastAddress());
+            VersionedApplicationState newState = statusLeft(peer);
+
+            target.runOnInstance(() -> {
+                // state to 'left'
+                EndpointState currentState = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+                ApplicationState as = toApplicationState(newState);
+                VersionedValue vv = toVersionedValue(newState);
+                currentState.addApplicationState(as, vv);
+                StorageService.instance.onChange(endpoint, as, vv);
+
+                // remove from gossip
+                Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.unsafeAnulEndpoint(endpoint));
+                SystemKeyspace.removeEndpoint(endpoint);
+                PendingRangeCalculatorService.instance.update();
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+            });
+        };
+    }
+
+    /**
+     * Changes gossip state of the `peer` on `target`
+     */
+    public static void changeGossipState(IInvokableInstance target, IInstance 
peer, List<VersionedApplicationState> newState)
+    {
+        InetSocketAddress addr = peer.broadcastAddress();
+        UUID hostId = peer.config().hostId();
+        int netVersion = peer.getMessagingVersion();
+        target.runOnInstance(() -> {
+            InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
+            StorageService storageService = StorageService.instance;
+
+            Gossiper.runInGossipStageBlocking(() -> {
+                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+                if (state == null)
+                {
+                    Gossiper.instance.initializeNodeUnsafe(endpoint, hostId, 
netVersion, 1);
+                    state = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint);
+                    if (state.isAlive() && 
!Gossiper.instance.isDeadState(state))
+                        Gossiper.instance.realMarkAlive(endpoint, state);
+                }
+
+                for (VersionedApplicationState value : newState)
+                {
+                    ApplicationState as = toApplicationState(value);
+                    VersionedValue vv = toVersionedValue(value);
+                    state.addApplicationState(as, vv);
+                    storageService.onChange(endpoint, as, vv);
+                }
+            });
+        });
+    }
+
+    public static void withProperty(String prop, boolean value, Runnable r)
+    {
+        String before = System.getProperty(prop);
+        try
+        {
+            System.setProperty(prop, Boolean.toString(value));
+            r.run();
+        }
+        finally
+        {
+            System.setProperty(prop, before == null ? "true" : before);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java 
b/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java
new file mode 100644
index 0000000..ce14dbc
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/action/InstanceAction.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.action;
+
+import java.util.function.Consumer;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+public interface InstanceAction extends Consumer<IInvokableInstance>
+{
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 361519d..44c9744 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import java.util.function.BiPredicate;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -162,7 +164,6 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
         }
     }
 
-
     protected class Wrapper extends DelegatingInvokableInstance implements 
IUpgradeableInstance
     {
         private final int generation;
@@ -252,7 +253,7 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
             if (!isShutdown && delegate != null)
                 return delegate().liveMemberCount();
 
-            throw new IllegalStateException("Cannot get live member count on 
shutdown instance");
+            throw new IllegalStateException("Cannot get live member count on 
shutdown instance: " + config.num());
         }
 
         public NodeToolResult nodetoolResult(boolean withNotifications, 
String... commandAndArgs)
@@ -387,13 +388,8 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
 
     public I bootstrap(IInstanceConfig config)
     {
-        if (!config.has(Feature.GOSSIP) || !config.has(Feature.NETWORK))
-            throw new IllegalStateException("New nodes can only be 
bootstrapped when gossip and networking is enabled.");
-
         I instance = newInstanceWrapperInternal(0, initialVersion, config);
-
         instances.add(instance);
-
         I prev = instanceMap.put(config.broadcastAddress(), instance);
 
         if (null != prev)
@@ -448,6 +444,41 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster<I
                                               
i.config().localRack().equals(rackName));
     }
 
+    public void run(Consumer<? super I> action,  Predicate<I> filter)
+    {
+        run(Collections.singletonList(action), filter);
+    }
+
+    public void run(Collection<Consumer<? super I>> actions, Predicate<I> 
filter)
+    {
+        stream().forEach(instance -> {
+            for (Consumer<? super I> action : actions)
+            {
+                if (filter.test(instance))
+                    action.accept(instance);
+            }
+
+        });
+    }
+
+    public void run(Consumer<? super I> action, int instanceId, int... 
moreInstanceIds)
+    {
+        run(Collections.singletonList(action), instanceId, moreInstanceIds);
+    }
+
+    public void run(List<Consumer<? super I>> actions, int instanceId, int... 
moreInstanceIds)
+    {
+        int[] instanceIds = new int[moreInstanceIds.length + 1];
+        instanceIds[0] = instanceId;
+        System.arraycopy(moreInstanceIds, 0, instanceIds, 1, 
moreInstanceIds.length);
+
+        for (int idx : instanceIds)
+        {
+            for (Consumer<? super I> action : actions)
+                action.accept(this.get(idx));
+        }
+    }
+
     public void forEach(IIsolatedExecutor.SerializableRunnable runnable)
     {
         forEach(i -> i.sync(runnable));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
index fbfda3a..0dfaa7e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/DistributedTestSnitch.java
@@ -40,7 +40,7 @@ public class DistributedTestSnitch extends 
AbstractNetworkTopologySnitch
     private static final Map<InetAddressAndPort, InetSocketAddress> cache = 
new ConcurrentHashMap<>();
     private static final Map<InetSocketAddress, InetAddressAndPort> 
cacheInverse = new ConcurrentHashMap<>();
 
-    static InetAddressAndPort toCassandraInetAddressAndPort(InetSocketAddress 
addressAndPort)
+    public static InetAddressAndPort 
toCassandraInetAddressAndPort(InetSocketAddress addressAndPort)
     {
         InetAddressAndPort m = cacheInverse.get(addressAndPort);
         if (m == null)
@@ -51,7 +51,7 @@ public class DistributedTestSnitch extends 
AbstractNetworkTopologySnitch
         return m;
     }
 
-    static InetSocketAddress 
fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
+    public static InetSocketAddress 
fromCassandraInetAddressAndPort(InetAddressAndPort addressAndPort)
     {
         InetSocketAddress m = cache.get(addressAndPort);
         if (m == null)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index edd525d..0448cb5 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -35,9 +35,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
-
 import javax.management.ListenerNotFoundException;
 import javax.management.Notification;
 import javax.management.NotificationListener;
@@ -64,8 +64,8 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspaceMigrator40;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
@@ -78,11 +78,8 @@ import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbe;
 import org.apache.cassandra.distributed.mock.nodetool.InternalNodeProbeFactory;
-import org.apache.cassandra.distributed.shared.InstanceClassLoader;
 import org.apache.cassandra.exceptions.StartupException;
-import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.io.IVersionedAsymmetricSerializer;
@@ -96,6 +93,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -110,8 +108,8 @@ import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.streaming.StreamReceiveTask;
 import org.apache.cassandra.streaming.StreamTransferTask;
 import org.apache.cassandra.streaming.async.StreamingInboundHandler;
-import org.apache.cassandra.tools.Output;
 import org.apache.cassandra.tools.NodeTool;
+import org.apache.cassandra.tools.Output;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -142,6 +140,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
     }};
 
     public final IInstanceConfig config;
+    private final long startedAt = System.nanoTime();
 
     // should never be invoked directly, so that it is instantiated on other 
class loader;
     // only visible for inheritance
@@ -477,17 +476,23 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 
StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                 if (config.has(GOSSIP))
                 {
+                    MigrationManager.setUptimeFn(() -> 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedAt));
                     StorageService.instance.initServer();
                     StorageService.instance.removeShutdownHook();
                     Gossiper.waitToSettle();
                 }
                 else
                 {
-                    initializeRing(cluster);
+                    cluster.stream().forEach(peer -> {
+                        if (cluster instanceof Cluster)
+                            GossipHelper.statusToNormal((IInvokableInstance) 
peer).accept(this);
+                        else
+                            GossipHelper.unsafeStatusToNormal(this, 
(IInstance) peer);
+                    });
+
                 }
 
                 StorageService.instance.ensureTraceKeyspace();
-
                 SystemKeyspace.finishStartup();
 
                 CassandraDaemon.getInstanceForTesting().setupCompleted();
@@ -513,6 +518,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }).run();
     }
 
+
     private void mkdirs()
     {
         new File(config.getString("saved_caches_directory")).mkdirs();
@@ -529,72 +535,6 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         return config;
     }
 
-    private void initializeRing(ICluster cluster)
-    {
-        // This should be done outside instance in order to avoid serializing 
config
-        String partitionerName = config.getString("partitioner");
-        List<String> initialTokens = new ArrayList<>();
-        List<InetSocketAddress> hosts = new ArrayList<>();
-        List<UUID> hostIds = new ArrayList<>();
-        List<String> versions = new ArrayList<>();
-        for (int i = 1 ; i <= cluster.size() ; ++i)
-        {
-            IInstance instance = cluster.get(i);
-            IInstanceConfig config = instance.config();
-            initialTokens.add(config.getString("initial_token"));
-            hosts.add(config.broadcastAddress());
-            hostIds.add(config.hostId());
-            versions.add(instance.getReleaseVersionString());
-        }
-
-        try
-        {
-            IPartitioner partitioner = 
FBUtilities.newPartitioner(partitionerName);
-            StorageService storageService = StorageService.instance;
-            List<Token> tokens = new ArrayList<>();
-            for (String token : initialTokens)
-                tokens.add(partitioner.getTokenFactory().fromString(token));
-
-            for (int i = 0; i < tokens.size(); i++)
-            {
-                InetSocketAddress ep = hosts.get(i);
-                InetAddressAndPort addressAndPort = 
toCassandraInetAddressAndPort(ep);
-                UUID hostId = hostIds.get(i);
-                Token token = tokens.get(i);
-                String releaseVersion = versions.get(i);
-                Gossiper.runInGossipStageBlocking(() -> {
-                    Gossiper.instance.initializeNodeUnsafe(addressAndPort, 
hostId, 1);
-                    Gossiper.instance.injectApplicationState(addressAndPort,
-                                                             
ApplicationState.TOKENS,
-                                                             new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token)));
-                    Gossiper.instance.injectApplicationState(addressAndPort,
-                                                             
ApplicationState.RELEASE_VERSION,
-                                                             new 
VersionedValue.VersionedValueFactory(partitioner).releaseVersion(releaseVersion));
-                    storageService.onChange(addressAndPort,
-                                            ApplicationState.STATUS_WITH_PORT,
-                                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                    storageService.onChange(addressAndPort,
-                                            ApplicationState.STATUS,
-                                            new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token)));
-                    Gossiper.instance.realMarkAlive(addressAndPort, 
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
-                });
-
-                int messagingVersion = cluster.get(ep).isShutdown()
-                                       ? MessagingService.current_version
-                                       : 
Math.min(MessagingService.current_version, 
cluster.get(ep).getMessagingVersion());
-                MessagingService.instance().versions.set(addressAndPort, 
messagingVersion);
-            }
-
-            // check that all nodes are in token metadata
-            for (int i = 0; i < tokens.size(); ++i)
-                assert 
storageService.getTokenMetadata().isMember(toCassandraInetAddressAndPort(hosts.get(i)));
-        }
-        catch (Throwable e) // UnknownHostException
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
     public Future<Void> shutdown()
     {
         return shutdown(true);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index 0d8f96f..c3b8019 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -150,7 +150,7 @@ public class IsolatedExecutor implements IIsolatedExecutor
         }
         catch (IllegalAccessException | InvocationTargetException e)
         {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error while transfering object to " + 
classLoader, e);
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
 
b/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
new file mode 100644
index 0000000..fd3e40a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/shared/VersionedApplicationState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.Serializable;
+
+public class VersionedApplicationState implements Serializable
+{
+    public final int applicationState;
+    public final String value;
+    public final int version;
+
+    public VersionedApplicationState(int applicationState, String value, int 
version)
+    {
+        this.applicationState = applicationState;
+        this.value = value;
+        this.version = version;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
deleted file mode 100644
index 934ad65..0000000
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.ICluster;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.distributed.shared.NetworkTopology;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-// TODO: this test should be removed after running in-jvm dtests is set up via 
the shared API repository
-public class BootstrapTest extends TestBaseImpl
-{
-
-    @Test
-    public void bootstrapTest() throws Throwable
-    {
-        int originalNodeCount = 2;
-        int expandedNodeCount = originalNodeCount + 1;
-        Cluster.Builder builder = builder().withNodes(originalNodeCount)
-                                           
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
-                                           
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
-                                           .withConfig(config -> 
config.with(NETWORK, GOSSIP));
-
-        Map<Integer, Long> withBootstrap = null;
-        Map<Integer, Long> naturally = null;
-        try (Cluster cluster = builder.withNodes(originalNodeCount).start())
-        {
-            populate(cluster);
-
-            IInstanceConfig config = cluster.newInstanceConfig();
-            config.set("auto_bootstrap", true);
-
-            cluster.bootstrap(config).startup();
-
-            cluster.stream().forEach(instance -> {
-                instance.nodetool("cleanup", KEYSPACE, "tbl");
-            });
-
-            withBootstrap = count(cluster);
-        }
-
-        builder = builder.withNodes(expandedNodeCount)
-                         
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
-                         .withConfig(config -> config.with(NETWORK, GOSSIP));
-
-        try (ICluster cluster = builder.start())
-        {
-            populate(cluster);
-            naturally = count(cluster);
-        }
-
-        for (Map.Entry<Integer, Long> e : withBootstrap.entrySet())
-            Assert.assertTrue(e.getValue() >= naturally.get(e.getKey()));
-    }
-
-    public void populate(ICluster cluster)
-    {
-        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + "};");
-        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))");
-
-        for (int i = 0; i < 1000; i++)
-            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (?, ?, ?)",
-                                           ConsistencyLevel.QUORUM,
-                                           i, i, i);
-    }
-
-    public Map<Integer, Long> count(ICluster cluster)
-    {
-        return IntStream.rangeClosed(1, cluster.size())
-                        .boxed()
-                        .collect(Collectors.toMap(nodeId -> nodeId,
-                                                  nodeId -> (Long) 
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + 
".tbl")[0][0]));
-    }
-}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java 
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 09fa1df..d53cbd4 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -46,7 +46,8 @@ public class TestBaseImpl extends DistributedTestBase
     }
 
     @BeforeClass
-    public static void beforeClass() throws Throwable {
+    public static void beforeClass() throws Throwable
+    {
         ICluster.setup();
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
new file mode 100644
index 0000000..e0c5a78
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static java.util.Arrays.asList;
+import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class BootstrapTest extends TestBaseImpl
+{
+    @Test
+    public void bootstrapTest() throws Throwable
+    {
+        int originalNodeCount = 2;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            populate(cluster,0, 100);
+
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance newInstance = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", false,
+                         () -> newInstance.startup(cluster));
+
+            cluster.forEach(statusToBootstrap(newInstance));
+
+            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
+                               bootstrap()),
+                        newInstance.config().num());
+
+            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
+                Assert.assertEquals("Node " + e.getKey() + " has incorrect row 
state",
+                                    100L,
+                                    e.getValue().longValue());
+        }
+    }
+
+    @Test
+    public void autoBootstrapTest() throws Throwable
+    {
+        int originalNodeCount = 2;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            populate(cluster,0, 100);
+
+            IInstanceConfig config = cluster.newInstanceConfig();
+            config.set("auto_bootstrap", true);
+            IInvokableInstance newInstance = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", false,
+                         () -> newInstance.startup(cluster));
+
+            newInstance.nodetoolResult("join").asserts().success();
+
+            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
+                Assert.assertEquals("Node " + e.getKey() + " has incorrect row 
state", e.getValue().longValue(), 100L);
+        }
+    }
+
+    public static void populate(ICluster cluster, int from, int to)
+    {
+        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
+    }
+
+    public static void populate(ICluster cluster, int from, int to, int coord, 
int rf, ConsistencyLevel cl)
+    {
+        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + 
"};");
+        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+        for (int i = from; i < to; i++)
+        {
+            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (?, ?, ?)",
+                                               cl,
+                                               i, i, i);
+        }
+    }
+
+    public static Map<Integer, Long> count(ICluster cluster)
+    {
+        return IntStream.rangeClosed(1, cluster.size())
+                        .boxed()
+                        .collect(Collectors.toMap(nodeId -> nodeId,
+                                                  nodeId -> (Long) 
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + 
".tbl")[0][0]));
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
new file mode 100644
index 0000000..37b1802
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/CommunicationDuringDecommissionTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.apache.cassandra.distributed.action.GossipHelper.decomission;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class CommunicationDuringDecommissionTest extends TestBaseImpl
+{
+    @Test
+    public void internodeConnectionsDuringDecom() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
+                                        .start())
+        {
+            BootstrapTest.populate(cluster, 0, 100);
+
+            cluster.run(decomission(), 1);
+
+            cluster.filters().allVerbs().from(1).messagesMatching((i, i1, 
iMessage) -> {
+                throw new AssertionError("Decomissioned node should not send 
any messages");
+            }).drop();
+
+
+            Map<Integer, Long> connectionAttempts = new HashMap<>();
+            long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(10);
+
+            // Wait 10 seconds and check if there are any new connection 
attempts to the decomissioned node
+            while (System.currentTimeMillis() <= deadline)
+            {
+                for (int i = 2; i <= cluster.size(); i++)
+                {
+                    Object[][] res = cluster.get(i).executeInternal("SELECT 
active_connections, connection_attempts FROM system_views.internode_outbound 
WHERE address = '127.0.0.1' AND port = 7012");
+                    Assert.assertEquals(1, res.length);
+                    Assert.assertEquals(0L, ((Long) res[0][0]).longValue());
+                    long attempts = ((Long) res[0][1]).longValue();
+                    if (connectionAttempts.get(i) == null)
+                        connectionAttempts.put(i, attempts);
+                    else
+                        Assert.assertEquals(connectionAttempts.get(i), (Long) 
attempts);
+                }
+                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
new file mode 100644
index 0000000..2333077
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/NodeNotInRingTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NodeNotInRingTest extends TestBaseImpl
+{
+    @Test
+    public void nodeNotInRingTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + 
" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};");
+            cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + 
".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+
+            cluster.filters().verbs(Verb.GOSSIP_DIGEST_ACK.id,
+                                    Verb.GOSSIP_DIGEST_SYN.id)
+                   .from(3)
+                   .outbound()
+                   .drop()
+                   .on();
+            cluster.run(GossipHelper.removeFromRing(cluster.get(3)), 1, 2);
+
+            populate(cluster, 0, 50, 1, ConsistencyLevel.ALL);
+            populate(cluster, 50, 100, 2, ConsistencyLevel.ALL);
+
+            Map<Integer, Long> counts = BootstrapTest.count(cluster);
+            Assert.assertEquals(counts.get(1).longValue(), 100L);
+            Assert.assertEquals(counts.get(2).longValue(), 100L);
+            Assert.assertEquals(counts.get(3).longValue(), 0L);
+        }
+    }
+
+    public static void populate(ICluster cluster, int from, int to, int coord, 
ConsistencyLevel cl)
+    {
+        for (int i = from; i < to; i++)
+        {
+            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (?, ?, ?)",
+                                               cl,
+                                               i, i, i);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
new file mode 100644
index 0000000..8daa58a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/PendingWritesTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.ring;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.impl.DistributedTestSnitch;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.disseminateGossipState;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
+import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class PendingWritesTest extends TestBaseImpl
+{
+    @Test
+    public void testPendingWrites() throws Throwable
+    {
+        int originalNodeCount = 2;
+        int expandedNodeCount = originalNodeCount + 1;
+
+        try (Cluster cluster = builder().withNodes(originalNodeCount)
+                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
+                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
+                                        .start())
+        {
+            BootstrapTest.populate(cluster, 0, 100);
+            IInstanceConfig config = cluster.newInstanceConfig();
+            IInvokableInstance newInstance = cluster.bootstrap(config);
+            withProperty("cassandra.join_ring", false,
+                         () -> newInstance.startup(cluster));
+
+            cluster.forEach(statusToBootstrap(newInstance));
+            cluster.run(bootstrap(false, Duration.ofSeconds(60), 
Duration.ofSeconds(60)), newInstance.config().num());
+
+            cluster.get(1).acceptsOnInstance((InetSocketAddress ip) -> {
+                Set<InetAddressAndPort> set = new HashSet<>();
+                for (Map.Entry<Range<Token>, EndpointsForRange.Builder> e : 
StorageService.instance.getTokenMetadata().getPendingRanges(KEYSPACE))
+                {
+                    set.addAll(e.getValue().build().endpoints());
+                }
+                Assert.assertEquals(set.size(), 1);
+                Assert.assertTrue(String.format("%s should contain %s", set, 
ip),
+                                  
set.contains(DistributedTestSnitch.toCassandraInetAddressAndPort(ip)));
+            }).accept(cluster.get(3).broadcastAddress());
+
+            BootstrapTest.populate(cluster, 100, 150);
+
+            newInstance.nodetoolResult("join").asserts().success();
+
+            cluster.run(disseminateGossipState(newInstance),1, 2);
+
+            cluster.run((instance) -> {
+                instance.runOnInstance(() -> {
+                    PendingRangeCalculatorService.instance.update();
+                    
PendingRangeCalculatorService.instance.blockUntilFinished();
+                });
+            }, 1, 2);
+
+            cluster.get(1).acceptsOnInstance((InetSocketAddress ip) -> {
+                Set<InetAddressAndPort> set = new HashSet<>();
+                for (Map.Entry<Range<Token>, EndpointsForRange.Builder> e : 
StorageService.instance.getTokenMetadata().getPendingRanges(KEYSPACE))
+                    set.addAll(e.getValue().build().endpoints());
+                assert set.size() == 0 : set;
+            }).accept(cluster.get(3).broadcastAddress());
+
+            for (Map.Entry<Integer, Long> e : 
BootstrapTest.count(cluster).entrySet())
+                Assert.assertEquals("Node " + e.getKey() + " has incorrect row 
state", e.getValue().longValue(), 150L);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 2c171e7..eba9a7c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -224,7 +225,7 @@ public class Util
         for (int i=0; i<endpointTokens.size(); i++)
         {
             InetAddressAndPort ep = InetAddressAndPort.getByName("127.0.0." + 
String.valueOf(i + 1));
-            Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 1);
+            Gossiper.instance.initializeNodeUnsafe(ep, hostIds.get(i), 
MessagingService.current_version, 1);
             Gossiper.instance.injectApplicationState(ep, 
ApplicationState.TOKENS, new 
VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(endpointTokens.get(i))));
             ss.onChange(ep,
                         ApplicationState.STATUS_WITH_PORT,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to