This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5879813 Cannot replace_address /X because it doesn't exist in gossip
5879813 is described below
commit 5879813db7e5c9485a393cf79473b77be38ad5b3
Author: David Capwell <[email protected]>
AuthorDate: Tue Dec 15 12:11:02 2020 -0800
Cannot replace_address /X because it doesn't exist in gossip
patch by David Capwell; reviewed by Brandon Williams, Jon Meredith, Paulo
Motta, Sam Tunnicliffe for CASSANDRA-16213
---
CHANGES.txt | 1 +
.../config/CassandraRelevantProperties.java | 12 +
.../org/apache/cassandra/gms/EndpointState.java | 14 +
.../cassandra/gms/GossipDigestSynVerbHandler.java | 34 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 124 +++-
.../org/apache/cassandra/gms/HeartBeatState.java | 22 +
.../apache/cassandra/service/StorageService.java | 89 ++-
.../apache/cassandra/distributed/Constants.java | 7 +
.../distributed/impl/AbstractCluster.java | 97 ++-
.../cassandra/distributed/impl/Instance.java | 20 +-
.../cassandra/distributed/impl/InstanceConfig.java | 5 +
.../cassandra/distributed/shared/ClusterUtils.java | 774 +++++++++++++++++++++
.../shared/{Shared.java => Isolated.java} | 8 +-
.../cassandra/distributed/shared/Shared.java | 2 +
.../distributed/shared/WithProperties.java | 114 +++
.../distributed/test/IPMembershipTest.java | 106 +++
.../cassandra/distributed/test/TestBaseImpl.java | 15 +
.../AssassinateAbruptDownedNodeTest.java} | 26 +-
.../AssassinateGracefullNodeTest.java} | 26 +-
.../hostreplacement/AssassinatedEmptyNodeTest.java | 62 ++
.../test/hostreplacement/BaseAssassinatedCase.java | 93 +++
.../HostReplacementAbruptDownedInstanceTest.java | 104 +++
.../HostReplacementOfDownedClusterTest.java | 180 +++++
.../test/hostreplacement/HostReplacementTest.java | 234 +++++++
.../org/apache/cassandra/gms/GossiperTest.java | 6 +-
25 files changed, 2091 insertions(+), 84 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3f68a8b..e8f89c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -23,6 +23,7 @@
* When a table attempts to clean up metrics, it was cleaning up all global
table metrics (CASSANDRA-16095)
* Bring back the accepted encryption protocols list as configurable option
(CASSANDRA-13325)
* DigestResolver.getData throws AssertionError since dataResponse is null
(CASSANDRA-16097)
+ * Cannot replace_address /X because it doesn't exist in gossip
(CASSANDRA-16213)
Merged from 3.11:
* SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to
default of 1GB (CASSANDRA-16071)
Merged from 3.0:
diff --git
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 5d918a8..17b9b2a 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -147,6 +147,18 @@ public enum CassandraRelevantProperties
*/
BOOTSTRAP_SCHEMA_DELAY_MS("cassandra.schema_delay_ms"),
+ /**
+ * Gossip quarantine delay is used while evaluating membership changes and
should only be changed with extreme care.
+ */
+ GOSSIPER_QUARANTINE_DELAY("cassandra.gossip_quarantine_delay_ms"),
+
+ /**
+ * When doing a host replacement its possible that the gossip state is
"empty" meaning that the endpoint is known
+ * but the current state isn't known. If the host replacement is needed
to repair this state, this property must
+ * be true.
+ */
+ REPLACEMENT_ALLOW_EMPTY("cassandra.allow_empty_replace_address", "true"),
+
//cassandra properties (without the "cassandra." prefix)
/**
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 8546a70..a4b294c 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -55,6 +55,11 @@ public class EndpointState
this(initialHbState, new EnumMap<ApplicationState,
VersionedValue>(ApplicationState.class));
}
+ public EndpointState(EndpointState other)
+ {
+ this(new HeartBeatState(other.hbState), new
EnumMap<>(other.applicationState.get()));
+ }
+
EndpointState(HeartBeatState initialHbState, Map<ApplicationState,
VersionedValue> states)
{
hbState = initialHbState;
@@ -138,6 +143,15 @@ public class EndpointState
isAlive = false;
}
+ /**
+ * @return true if {@link HeartBeatState#isEmpty()} is true and no STATUS
application state exists
+ */
+ public boolean isEmptyWithoutStatus()
+ {
+ Map<ApplicationState, VersionedValue> state = applicationState.get();
+ return hbState.isEmpty() &&
!(state.containsKey(ApplicationState.STATUS_WITH_PORT) ||
state.containsKey(ApplicationState.STATUS));
+ }
+
public boolean isRpcReady()
{
VersionedValue rpcState =
getApplicationState(ApplicationState.RPC_READY);
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 520dbec..abaa39b 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -17,7 +17,11 @@
*/
package org.apache.cassandra.gms;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,7 +31,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import static org.apache.cassandra.net.Verb.*;
+import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK;
public class GossipDigestSynVerbHandler extends
GossipVerbHandler<GossipDigestSyn>
{
@@ -97,15 +101,31 @@ public class GossipDigestSynVerbHandler extends
GossipVerbHandler<GossipDigestSy
logger.trace("Gossip syn digests are : {}", sb);
}
- List<GossipDigest> deltaGossipDigestList = new
ArrayList<GossipDigest>();
- Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new
HashMap<InetAddressAndPort, EndpointState>();
- Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList,
deltaEpStateMap);
- logger.trace("sending {} digests and {} deltas",
deltaGossipDigestList.size(), deltaEpStateMap.size());
- Message<GossipDigestAck> gDigestAckMessage =
Message.out(GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList,
deltaEpStateMap));
+ Message<GossipDigestAck> gDigestAckMessage = gDigestList.isEmpty() ?
+ createShadowReply() :
+
createNormalReply(gDigestList);
+
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestAckMessage to {}", from);
MessagingService.instance().send(gDigestAckMessage, from);
super.doVerb(message);
}
+
+ private static Message<GossipDigestAck>
createNormalReply(List<GossipDigest> gDigestList)
+ {
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<>();
+ Map<InetAddressAndPort, EndpointState> deltaEpStateMap = new
HashMap<>();
+ Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList,
deltaEpStateMap);
+ logger.trace("sending {} digests and {} deltas",
deltaGossipDigestList.size(), deltaEpStateMap.size());
+
+ return Message.out(GOSSIP_DIGEST_ACK, new
GossipDigestAck(deltaGossipDigestList, deltaEpStateMap));
+ }
+
+ private static Message<GossipDigestAck> createShadowReply()
+ {
+ Map<InetAddressAndPort, EndpointState> stateMap =
Gossiper.instance.examineShadowState();
+ logger.trace("sending 0 digests and {} deltas", stateMap.size());
+ return Message.out(GOSSIP_DIGEST_ACK, new
GossipDigestAck(Collections.emptyList(), stateMap));
+ }
}
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 51e7e54..a3be834 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.ECHO_REQ;
import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
@@ -86,6 +87,7 @@ import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
public static final String MBEAN_NAME =
"org.apache.cassandra.net:type=Gossiper";
+
public static class Props
{
public static final String DISABLE_THREAD_VALIDATION =
"cassandra.gossip.disable_thread_validation";
@@ -107,7 +109,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
private volatile ScheduledFuture<?> scheduledGossipTask;
private static final ReentrantLock taskLock = new ReentrantLock();
public final static int intervalInMillis = 1000;
- public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
+ public final static int QUARANTINE_DELAY =
GOSSIPER_QUARANTINE_DELAY.getInt(StorageService.RING_DELAY * 2);
private static final Logger logger =
LoggerFactory.getLogger(Gossiper.class);
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
@@ -1218,9 +1220,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
checkProperThreadForStateMutation();
if (logger.isTraceEnabled())
logger.trace("marking as down {}", addr);
- localState.markDead();
- liveEndpoints.remove(addr);
- unreachableEndpoints.put(addr, System.nanoTime());
+ silentlyMarkDead(addr, localState);
logger.info("InetAddress {} is now DOWN", addr);
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onDead(addr, localState);
@@ -1231,6 +1231,18 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
}
/**
+ * Used by {@link #markDead(InetAddressAndPort, EndpointState)} and {@link
#addSavedEndpoint(InetAddressAndPort)}
+ * to register a endpoint as dead. This method is "silent" to avoid
triggering listeners, diagnostics, or logs
+ * on startup via addSavedEndpoint.
+ */
+ private void silentlyMarkDead(InetAddressAndPort addr, EndpointState
localState)
+ {
+ localState.markDead();
+ liveEndpoints.remove(addr);
+ unreachableEndpoints.put(addr, System.nanoTime());
+ }
+
+ /**
* This method is called whenever there is a "big" change in ep state (a
generation change for a known node).
*
* @param ep endpoint
@@ -1480,23 +1492,73 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
deltaEpStateMap.put(gDigest.getEndpoint(), localEpStatePtr);
}
- /*
- This method is used to figure the state that the Gossiper has but
Gossipee doesn't. The delta digests
- and the delta state are built up.
- */
- void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest>
deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap)
+ /**
+ * Used during a shadow round to collect the current state; this method
clones the current state, no filtering
+ * is done.
+ *
+ * During the shadow round its desirable to return gossip state for remote
instances that were created by this
+ * process also known as "empty", this is done for host replacement to be
able to replace downed hosts that are
+ * in the ring but have no state in gossip (see CASSANDRA-16213).
+ *
+ * This method is different than {@link #examineGossiper(List, List, Map)}
with respect to how "empty" states are
+ * dealt with; they are kept.
+ */
+ Map<InetAddressAndPort, EndpointState> examineShadowState()
{
- if (gDigestList.size() == 0)
+ logger.debug("Shadow request received, adding all states");
+ Map<InetAddressAndPort, EndpointState> map = new HashMap<>();
+ for (Entry<InetAddressAndPort, EndpointState> e :
endpointStateMap.entrySet())
{
- /* we've been sent a *completely* empty syn, which should normally
never happen since an endpoint will at least send a syn with itself.
- If this is happening then the node is attempting shadow gossip,
and we should respond with everything we know.
- */
- logger.debug("Shadow request received, adding all states");
- for (Map.Entry<InetAddressAndPort, EndpointState> entry :
endpointStateMap.entrySet())
+ InetAddressAndPort endpoint = e.getKey();
+ EndpointState state = new EndpointState(e.getValue());
+ if (state.isEmptyWithoutStatus())
{
- gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
+ // We have no app states loaded for this endpoint, but we may
well have
+ // some state persisted in the system keyspace. This can
happen in the case
+ // of a full cluster bounce where one or more nodes fail to
come up. As
+ // gossip state is transient, the peers which do successfully
start will be
+ // aware of the failed nodes thanks to
StorageService::initServer calling
+ // Gossiper.instance::addSavedEndpoint with every endpoint in
TokenMetadata,
+ // which itself is populated from the system tables at startup.
+ // Here we know that a peer which is starting up and
attempting to perform
+ // a shadow round of gossip. This peer is in one of two states:
+ // * it is replacing a down node, in which case it needs to
learn the tokens
+ // of the down node and optionally its host id.
+ // * it needs to check that no other instance is already
associated with its
+ // endpoint address and port.
+ // To support both of these cases, we can add the tokens and
host id from
+ // the system table, if they exist. These are only ever
persisted to the system
+ // table when the actual node to which they apply enters the
UP/NORMAL state.
+ // This invariant will be preserved as nodes never persist or
propagate the
+ // results of a shadow round, so this communication will be
strictly limited
+ // to this node and the node performing the shadow round.
+ UUID hostId = SystemKeyspace.loadHostIds().get(endpoint);
+ if (null != hostId)
+ {
+ state.addApplicationState(ApplicationState.HOST_ID,
+
StorageService.instance.valueFactory.hostId(hostId));
+ }
+ Set<Token> tokens = SystemKeyspace.loadTokens().get(endpoint);
+ if (null != tokens && !tokens.isEmpty())
+ {
+ state.addApplicationState(ApplicationState.TOKENS,
+
StorageService.instance.valueFactory.tokens(tokens));
+ }
}
+ map.put(endpoint, state);
}
+ return map;
+ }
+
+ /**
+ * This method is used to figure the state that the Gossiper has but
Gossipee doesn't. The delta digests
+ * and the delta state are built up.
+ *
+ * When a {@link EndpointState} is "empty" then it is filtered out and not
added to the delta state (see CASSANDRA-16213).
+ */
+ void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest>
deltaGossipDigestList, Map<InetAddressAndPort, EndpointState> deltaEpStateMap)
+ {
+ assert !gDigestList.isEmpty() : "examineGossiper called with empty
digest list";
for ( GossipDigest gDigest : gDigestList )
{
int remoteGeneration = gDigest.getGeneration();
@@ -1523,8 +1585,8 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
}
else if (remoteGeneration < localGeneration)
{
- /* send all data with generation = localgeneration and
version > 0 */
- sendAll(gDigest, deltaEpStateMap, 0);
+ /* send all data with generation = localgeneration and
version > -1 */
+ sendAll(gDigest, deltaEpStateMap,
HeartBeatState.EMPTY_VERSION);
}
else if (remoteGeneration == localGeneration)
{
@@ -1779,16 +1841,17 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
if (epState != null)
{
logger.debug("not replacing a previous epState for {}, but reusing
it: {}", ep, epState);
- epState.setHeartBeatState(new HeartBeatState(0));
+ epState.setHeartBeatState(HeartBeatState.empty());
}
else
{
- epState = new EndpointState(new HeartBeatState(0));
+ epState = new EndpointState(HeartBeatState.empty());
+ logger.info("Adding {} as there was no previous epState; new state
is {}", ep, epState);
}
epState.markDead();
endpointStateMap.put(ep, epState);
- unreachableEndpoints.put(ep, System.nanoTime());
+ silentlyMarkDead(ep, epState);
if (logger.isTraceEnabled())
logger.trace("Adding saved endpoint {} {}", ep,
epState.getHeartBeatState().getGeneration());
}
@@ -1895,6 +1958,25 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
return inShadowRound;
}
+ /**
+ * Creates a new dead {@link EndpointState} that is {@link
EndpointState#isEmptyWithoutStatus() empty}. This is used during
+ * host replacement for edge cases where the seed notified that the
endpoint was empty, so need to add such state
+ * into gossip explicitly (as empty endpoints are not gossiped outside of
the shadow round).
+ *
+ * see CASSANDRA-16213
+ */
+ public void initializeUnreachableNodeUnsafe(InetAddressAndPort addr)
+ {
+ EndpointState state = new EndpointState(HeartBeatState.empty());
+ state.markDead();
+ EndpointState oldState = endpointStateMap.putIfAbsent(addr, state);
+ if (null != oldState)
+ {
+ throw new RuntimeException("Attempted to initialize endpoint state
for unreachable node, " +
+ "but found existing endpoint state for
it.");
+ }
+ }
+
@VisibleForTesting
public void initializeNodeUnsafe(InetAddressAndPort addr, UUID uuid, int
generationNbr)
{
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java
b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index 2abd5d7..75f4f56 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -29,6 +29,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
*/
public class HeartBeatState
{
+ public static final int EMPTY_VERSION = -1;
+
public static final IVersionedSerializer<HeartBeatState> serializer = new
HeartBeatStateSerializer();
private volatile int generation;
@@ -39,12 +41,32 @@ public class HeartBeatState
this(gen, 0);
}
+ public HeartBeatState(HeartBeatState other)
+ {
+ generation = other.generation;
+ version = other.version;
+ }
+
public HeartBeatState(int gen, int ver)
{
generation = gen;
version = ver;
}
+ public static HeartBeatState empty()
+ {
+ return new HeartBeatState(0, EMPTY_VERSION);
+ }
+
+ public boolean isEmpty()
+ {
+ // Instance I1 will update this value for I1's state, no other
instance should.
+ // It is also known that negative version isn't allowed, so can
leverage this to
+ // know that the state was not generated by I1 but rather than
instance (normally
+ // happens on startup, the peers are added to gossip with the empty
state).
+ return version == EMPTY_VERSION;
+ }
+
int getGeneration()
{
return generation;
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 8aaf9ab..1b180c0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -33,7 +33,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.management.*;
import javax.management.openmbean.CompositeData;
@@ -104,7 +103,6 @@ import org.apache.cassandra.streaming.*;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.apache.cassandra.transport.ClientResourceLimits;
import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.logging.LoggingSupportFactory;
import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -124,6 +122,7 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
import static
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
import static org.apache.cassandra.net.NoPayload.noPayload;
@@ -523,18 +522,68 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
logger.info("Gathering node replacement information for {}",
replaceAddress);
Map<InetAddressAndPort, EndpointState> epStates =
Gossiper.instance.doShadowRound();
// as we've completed the shadow round of gossip, we should be able to
find the node we're replacing
- if (epStates.get(replaceAddress) == null)
+ EndpointState state = epStates.get(replaceAddress);
+ if (state == null)
throw new RuntimeException(String.format("Cannot replace_address
%s because it doesn't exist in gossip", replaceAddress));
validateEndpointSnitch(epStates.values().iterator());
try
{
- VersionedValue tokensVersionedValue =
epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue =
state.getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException(String.format("Could not find
tokens for %s to replace", replaceAddress));
- bootstrapTokens =
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new
ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ Collection<Token> tokens =
TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new
ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ bootstrapTokens =
validateReplacementBootstrapTokens(tokenMetadata, replaceAddress, tokens);
+
+ if (state.isEmptyWithoutStatus() &&
REPLACEMENT_ALLOW_EMPTY.getBoolean())
+ {
+ logger.warn("Gossip state not present for replacing node {}.
Adding temporary entry to continue.", replaceAddress);
+
+ // When replacing a node, we take ownership of all its tokens.
+ // If that node is currently down and not present in the
gossip info
+ // of any other live peers, then we will not be able to take
ownership
+ // of its tokens during bootstrap as they have no way of being
propagated
+ // to this node's TokenMetadata. TM is loaded at startup (in
which case
+ // it will be/ empty for a new replacement node) and only
updated with
+ // tokens for an endpoint during normal state propagation
(which will not
+ // occur if no peers have gossip state for it).
+ // However, the presence of host id and tokens in the system
tables implies
+ // that the node managed to complete bootstrap at some point
in the past.
+ // Peers may include this information loaded directly from
system tables
+ // in a GossipDigestAck *only if* the GossipDigestSyn was sent
as part of a
+ // shadow round (otherwise, a GossipDigestAck contains only
state about peers
+ // learned via gossip).
+ // It is safe to do this here as since we completed a shadow
round we know
+ // that :
+ // * replaceAddress successfully bootstrapped at some point
and owned these
+ // tokens
+ // * we know that no other node currently owns these tokens
+ // * we are going to completely take over replaceAddress's
ownership of
+ // these tokens.
+ tokenMetadata.updateNormalTokens(bootstrapTokens,
replaceAddress);
+ UUID hostId = Gossiper.instance.getHostId(replaceAddress,
epStates);
+ if (hostId != null)
+ tokenMetadata.updateHostId(hostId, replaceAddress);
+
+ // If we were only able to learn about the node being replaced
through the
+ // shadow gossip round (i.e. there is no state in gossip
across the cluster
+ // about it, perhaps because the entire cluster has been
bounced since it went
+ // down), then we're safe to proceed with the replacement. In
this case, there
+ // will be no local endpoint state as we discard the results
of the shadow
+ // round after preparing replacement info. We inject a minimal
EndpointState
+ // to keep FailureDetector::isAlive and
Gossiper::compareEndpointStartup from
+ // failing later in the replacement, as they both expect the
replaced node to
+ // be fully present in gossip.
+ // Otherwise, if the replaced node is present in gossip, we
need check that
+ // it is not in fact live.
+ // We choose to not include the EndpointState provided during
the shadow round
+ // as its possible to include more state than is desired, so
by creating a
+ // new empty endpoint without that information we can control
what is in our
+ // local gossip state
+
Gossiper.instance.initializeUnreachableNodeUnsafe(replaceAddress);
+ }
}
catch (IOException e)
{
@@ -552,6 +601,35 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return localHostId;
}
+ private static Collection<Token>
validateReplacementBootstrapTokens(TokenMetadata tokenMetadata,
+
InetAddressAndPort replaceAddress,
+
Collection<Token> bootstrapTokens)
+ {
+ Map<Token, InetAddressAndPort> conflicts = new HashMap<>();
+ for (Token token : bootstrapTokens)
+ {
+ InetAddressAndPort conflict = tokenMetadata.getEndpoint(token);
+ if (null != conflict && !conflict.equals(replaceAddress))
+ conflicts.put(token, tokenMetadata.getEndpoint(token));
+ }
+
+ if (!conflicts.isEmpty())
+ {
+ String error = String.format("Conflicting token ownership
information detected between " +
+ "gossip and current ring view during
proposed replacement " +
+ "of %s. Some tokens identified in
gossip for the node being " +
+ "replaced are currently owned by
other peers: %s",
+ replaceAddress,
+ conflicts.entrySet()
+ .stream()
+ .map(e -> e.getKey() + "(" +
e.getValue() + ")" )
+
.collect(Collectors.joining(",")));
+ throw new RuntimeException(error);
+
+ }
+ return bootstrapTokens;
+ }
+
private synchronized void checkForEndpointCollision(UUID localHostId,
Set<InetAddressAndPort> peers) throws ConfigurationException
{
if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
@@ -2615,6 +2693,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
if (!tokensToUpdateInSystemKeyspace.isEmpty())
SystemKeyspace.updateTokens(endpoint,
tokensToUpdateInSystemKeyspace);
}
+
/**
* Handle node move to normal state. That is, node is entering token ring
and participating
* in reads.
diff --git a/test/distributed/org/apache/cassandra/distributed/Constants.java
b/test/distributed/org/apache/cassandra/distributed/Constants.java
index b7d2d26..d7a2acd 100644
--- a/test/distributed/org/apache/cassandra/distributed/Constants.java
+++ b/test/distributed/org/apache/cassandra/distributed/Constants.java
@@ -31,4 +31,11 @@ public final class Constants
* of the YAML is not desired.
*/
public static final String KEY_DTEST_API_CONFIG_CHECK =
"dtest.api.config.check";
+
+ /**
+ * Property used by AbstractCluster to determine how a failed Instance
startup state should be; if not set
+ * the Instance is marked as "shutdown", but this flag can be used to
leave the instance "running" by setting
+ * 'true'.
+ */
+ public static final String KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN =
"dtest.api.startup.failure_as_shutdown";
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 0eea077..53502f9 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -19,16 +19,18 @@
package org.apache.cassandra.distributed.impl;
import java.io.File;
+import java.lang.annotation.Annotation;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -41,6 +43,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import javax.annotation.concurrent.GuardedBy;
+
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +70,7 @@ import org.apache.cassandra.distributed.api.LogAction;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.InstanceClassLoader;
+import org.apache.cassandra.distributed.shared.Isolated;
import org.apache.cassandra.distributed.shared.MessageFilters;
import org.apache.cassandra.distributed.shared.Metrics;
import org.apache.cassandra.distributed.shared.NetworkTopology;
@@ -77,6 +82,7 @@ import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.reflections.Reflections;
+import org.reflections.util.ConfigurationBuilder;
import static
org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
@@ -116,10 +122,15 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
// include byteman so tests can use
private static final Set<String> SHARED_CLASSES =
findClassesMarkedForSharedClassLoader();
- private static final Predicate<String> SHARED_PREDICATE = s ->
-
SHARED_CLASSES.contains(s) ||
-
InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
-
s.startsWith("org.jboss.byteman");
+ private static final Set<String> ISOLATED_CLASSES =
findClassesMarkedForInstanceClassLoader();
+ private static final Predicate<String> SHARED_PREDICATE = s -> {
+ if (ISOLATED_CLASSES.contains(s))
+ return false;
+
+ return SHARED_CLASSES.contains(s) ||
+ InstanceClassLoader.getDefaultLoadSharedFilter().test(s) ||
+ s.startsWith("org.jboss.byteman");
+ };
private final UUID clusterId = UUID.randomUUID();
private final File root;
@@ -171,7 +182,10 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
private final IInstanceConfig config;
private volatile IInvokableInstance delegate;
private volatile Versions.Version version;
+ @GuardedBy("this")
private volatile boolean isShutdown = true;
+ @GuardedBy("this")
+ private InetSocketAddress broadcastAddress;
protected IInvokableInstance delegate()
{
@@ -194,6 +208,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
this.version = version;
// we ensure there is always a non-null delegate, so that the
executor may be used while the node is offline
this.delegate = newInstance(generation);
+ this.broadcastAddress = config.broadcastAddress();
}
private IInvokableInstance newInstance(int generation)
@@ -215,20 +230,55 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
return isShutdown;
}
+ private boolean isRunning()
+ {
+ return !isShutdown;
+ }
+
@Override
public synchronized void startup()
{
startup(AbstractCluster.this);
}
-
public synchronized void startup(ICluster cluster)
{
if (cluster != AbstractCluster.this)
throw new IllegalArgumentException("Only the owning cluster
can be used for startup");
- if (!isShutdown)
- throw new IllegalStateException();
- delegateForStartup().startup(cluster);
+ if (isRunning())
+ throw new IllegalStateException("Can not start a instance that
is already running");
isShutdown = false;
+ if (!broadcastAddress.equals(config.broadcastAddress()))
+ {
+ // previous address != desired address, so cleanup
+ InetSocketAddress previous = broadcastAddress;
+ InetSocketAddress newAddress = config.broadcastAddress();
+ instanceMap.put(newAddress, (I) this); // if the broadcast
address changes, update
+ instanceMap.remove(previous);
+ broadcastAddress = newAddress;
+ }
+ try
+ {
+ delegateForStartup().startup(cluster);
+ }
+ catch (Throwable t)
+ {
+ if
(config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN) == null)
+ {
+ // its possible that the failure happens after listening
and threads are started up
+ // but without knowing the start up phase it isn't safe to
call shutdown, so assume
+ // that a failed to start instance was shutdown (which
would be true if each instance
+ // was its own JVM).
+ isShutdown = true;
+ }
+ else
+ {
+ // user was explict about the desired behavior, respect it
+ // the most common reason to set this is to set 'false',
this will leave the
+ // instance marked as running, which will have .close shut
it down.
+ isShutdown = (boolean)
config.get(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN);
+ }
+ throw t;
+ }
updateMessagingVersions();
}
@@ -241,8 +291,8 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
@Override
public synchronized Future<Void> shutdown(boolean graceful)
{
- if (isShutdown)
- throw new IllegalStateException();
+ if (isShutdown())
+ throw new IllegalStateException("Instance is not running, so
can not be shutdown");
isShutdown = true;
Future<Void> future = delegate.shutdown(graceful);
delegate = null;
@@ -251,7 +301,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
public int liveMemberCount()
{
- if (!isShutdown && delegate != null)
+ if (isRunning() && delegate != null)
return delegate().liveMemberCount();
throw new IllegalStateException("Cannot get live member count on
shutdown instance: " + config.num());
@@ -283,7 +333,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
public void receiveMessage(IMessage message)
{
IInvokableInstance delegate = this.delegate;
- if (!isShutdown && delegate != null) // since we sync directly on
the other node, we drop messages immediately if we are shutdown
+ if (isRunning() && delegate != null) // since we sync directly on
the other node, we drop messages immediately if we are shutdown
delegate.receiveMessage(message);
}
@@ -302,7 +352,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
@Override
public synchronized void setVersion(Versions.Version version)
{
- if (!isShutdown)
+ if (isRunning())
throw new IllegalStateException("Must be shutdown before
version can be modified");
// re-initialise
this.version = version;
@@ -343,7 +393,7 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
this.broadcastPort = builder.getBroadcastPort();
this.nodeProvisionStrategy = builder.nodeProvisionStrategy;
this.instances = new ArrayList<>();
- this.instanceMap = new HashMap<>();
+ this.instanceMap = new ConcurrentHashMap<>();
this.initialVersion = builder.getVersion();
this.filters = new MessageFilters();
this.instanceInitializer = builder.getInstanceInitializer();
@@ -861,9 +911,20 @@ public abstract class AbstractCluster<I extends IInstance>
implements ICluster<I
private static Set<String> findClassesMarkedForSharedClassLoader()
{
- return new
Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
- .map(Class::getName)
- .collect(Collectors.toSet());
+ return findClassesMarkedWith(Shared.class);
+ }
+
+ private static Set<String> findClassesMarkedForInstanceClassLoader()
+ {
+ return findClassesMarkedWith(Isolated.class);
+ }
+
+ private static Set<String> findClassesMarkedWith(Class<? extends
Annotation> annotation)
+ {
+ return new
Reflections(ConfigurationBuilder.build("org.apache.cassandra").setExpandSuperTypes(false))
+ .getTypesAnnotatedWith(annotation).stream()
+ .map(Class::getName)
+ .collect(Collectors.toSet());
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index a4163a1..87d84c8 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -257,7 +257,9 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
{
MessagingService.instance().outboundSink.add((message, to) -> {
InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
-
cluster.get(toAddr).receiveMessage(serializeMessage(message.from(), to,
message));
+ IInstance toInstance = cluster.get(toAddr);
+ if (toInstance != null)
+ toInstance.receiveMessage(serializeMessage(message.from(), to,
message));
return false;
});
}
@@ -268,7 +270,10 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
if (isShutdown())
return false;
IMessage serialized = serializeMessage(message.from(),
toCassandraInetAddressAndPort(broadcastAddress()), message);
- int fromNum = cluster.get(serialized.from()).config().num();
+ IInstance from = cluster.get(serialized.from());
+ if (from == null)
+ return false;
+ int fromNum = from.config().num();
int toNum = config.num(); // since this instance is reciving the
message, to will always be this instance
return cluster.filters().permitInbound(fromNum, toNum, serialized);
});
@@ -281,7 +286,10 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
return false;
IMessage serialzied = serializeMessage(message.from(), to,
message);
int fromNum = config.num(); // since this instance is sending the
message, from will always be this instance
- int toNum =
cluster.get(fromCassandraInetAddressAndPort(to)).config().num();
+ IInstance toInstance =
cluster.get(fromCassandraInetAddressAndPort(to));
+ if (toInstance == null)
+ return false;
+ int toNum = toInstance.config().num();
return cluster.filters().permitOutbound(fromNum, toNum,
serialzied);
});
}
@@ -455,6 +463,9 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
throw new RuntimeException(e);
}
+ // Re-populate token metadata after commit log recover (new
peers might be loaded onto system keyspace #10293)
+ StorageService.instance.populateTokenMetadata();
+
Verb.REQUEST_RSP.unsafeSetSerializer(() ->
ReadResponse.serializer);
if (config.has(NETWORK))
@@ -685,6 +696,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
.thenRun(super::shutdown);
}
+ @Override
public int liveMemberCount()
{
return sync(() -> {
@@ -694,11 +706,13 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}).call();
}
+ @Override
public Metrics metrics()
{
return callOnInstance(() -> new
InstanceMetrics(CassandraMetricsRegistry.Metrics));
}
+ @Override
public NodeToolResult nodetoolResult(boolean withNotifications, String...
commandAndArgs)
{
return sync(() -> {
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index bf615cd..895f2a7 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -139,6 +139,11 @@ public class InstanceConfig implements IInstanceConfig
return
DistributedTestSnitch.fromCassandraInetAddressAndPort(getBroadcastAddressAndPort());
}
+ public void unsetBroadcastAddressAndPort()
+ {
+ broadcastAddressAndPort = null;
+ }
+
protected InetAddressAndPort getBroadcastAddressAndPort()
{
if (broadcastAddressAndPort == null)
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
new file mode 100644
index 0000000..a68e819
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -0,0 +1,774 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.service.StorageService;
+
+import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Utilities for working with jvm-dtest clusters.
+ *
+ * This class is marked as Isolated as it relies on lambdas, which are in a
package that is marked as shared, so need to
+ * tell jvm-dtest to not share this class.
+ *
+ * This class should never be called from within the cluster, always in the
App ClassLoader.
+ */
+@Isolated
+public class ClusterUtils
+{
+ /**
+ * Start the instance with the given System Properties, after the instance
has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst,
Consumer<WithProperties> fn)
+ {
+ return start(inst, (ignore, prop) -> fn.accept(prop));
+ }
+
+ /**
+ * Start the instance with the given System Properties, after the instance
has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst, BiConsumer<I,
WithProperties> fn)
+ {
+ try (WithProperties properties = new WithProperties())
+ {
+ fn.accept(inst, properties);
+ inst.startup();
+ return inst;
+ }
+ }
+
+ /**
+ * Stop an instance in a blocking manner.
+ *
+ * The main difference between this and {@link IInstance#shutdown()} is
that the wait on the future will catch
+ * the exceptions and throw as runtime.
+ */
+ public static void stopUnchecked(IInstance i)
+ {
+ Futures.getUnchecked(i.shutdown());
+ }
+
+ /**
+ * Stops an instance abruptly. This is done by blocking all messages
to/from so all other instances are unable
+ * to communicate, then stopping the instance gracefully.
+ *
+ * The assumption is that hard stopping inbound and outbound messages will
apear to the cluster as if the instance
+ * was stopped via kill -9; this does not hold true if the instance is
restarted as it knows it was properly shutdown.
+ *
+ * @param cluster to filter messages to
+ * @param inst to shut down
+ */
+ public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I
inst)
+ {
+ // block all messages to/from the node going down to make sure a clean
shutdown doesn't happen
+ IMessageFilters.Filter to =
cluster.filters().allVerbs().to(inst.config().num()).drop();
+ IMessageFilters.Filter from =
cluster.filters().allVerbs().from(inst.config().num()).drop();
+ try
+ {
+ stopUnchecked(inst);
+ }
+ finally
+ {
+ from.off();
+ to.off();
+ }
+ }
+
+ /**
+ * Stop all the instances in the cluster. This function is differe than
{@link ICluster#close()} as it doesn't
+ * clean up the cluster state, it only stops all the instances.
+ */
+ public static <I extends IInstance> void stopAll(ICluster<I> cluster)
+ {
+ cluster.stream().forEach(ClusterUtils::stopUnchecked);
+ }
+
+ /**
+ * Create a new instance and add it to the cluster, without starting it.
+ *
+ * @param cluster to add to
+ * @param dc the instance should be in
+ * @param rack the instance should be in
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I addInstance(AbstractCluster<I>
cluster,
+ String dc, String rack)
+ {
+ return addInstance(cluster, dc, rack, ignore -> {});
+ }
+
+ /**
+ * Create a new instance and add it to the cluster, without starting it.
+ *
+ * @param cluster to add to
+ * @param dc the instance should be in
+ * @param rack the instance should be in
+ * @param fn function to add to the config before starting
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I addInstance(AbstractCluster<I>
cluster,
+ String dc, String rack,
+
Consumer<IInstanceConfig> fn)
+ {
+ Objects.requireNonNull(dc, "dc");
+ Objects.requireNonNull(rack, "rack");
+
+ InstanceConfig config = cluster.newInstanceConfig();
+ //TODO adding new instances should be cleaner, currently requires you
create the cluster with all
+ // instances known about (at least to NetworkTopology and TokenStategy)
+ // this is very hidden, so should be more explicit
+ config.networkTopology().put(config.broadcastAddress(),
NetworkTopology.dcAndRack(dc, rack));
+
+ fn.accept(config);
+
+ return cluster.bootstrap(config);
+ }
+
+ /**
+ * Create and start a new instance that replaces an existing instance.
+ *
+ * The instance will be in the same datacenter and rack as the existing
instance.
+ *
+ * @param cluster to add to
+ * @param toReplace instance to replace
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I
replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace)
+ {
+ return replaceHostAndStart(cluster, toReplace, ignore -> {});
+ }
+
+ /**
+ * Create and start a new instance that replaces an existing instance.
+ *
+ * The instance will be in the same datacenter and rack as the existing
instance.
+ *
+ * @param cluster to add to
+ * @param toReplace instance to replace
+ * @param fn lambda to add additional properties
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I
replaceHostAndStart(AbstractCluster<I> cluster,
+ IInstance
toReplace,
+
Consumer<WithProperties> fn)
+ {
+ IInstanceConfig toReplaceConf = toReplace.config();
+ I inst = addInstance(cluster, toReplaceConf.localDatacenter(),
toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
+
+ return start(inst, properties -> {
+ // lower this so the replacement waits less time
+ properties.setProperty("cassandra.broadcast_interval_ms",
Long.toString(TimeUnit.SECONDS.toMillis(30)));
+ // default is 30s, lowering as it should be faster
+ properties.setProperty("cassandra.ring_delay_ms",
Long.toString(TimeUnit.SECONDS.toMillis(10)));
+ properties.set(BOOTSTRAP_SCHEMA_DELAY_MS,
TimeUnit.SECONDS.toMillis(10));
+
+ // state which node to replace
+ properties.setProperty("cassandra.replace_address_first_boot",
toReplace.config().broadcastAddress().getAddress().getHostAddress());
+
+ fn.accept(properties);
+ });
+ }
+
+ /**
+ * Calls {@link
org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list
of strings.
+ */
+ public static List<String> getTokenMetadataTokens(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() ->
+ StorageService.instance.getTokenMetadata()
+
.sortedTokens().stream()
+
.map(Object::toString)
+
.collect(Collectors.toList()));
+ }
+
+ /**
+ * Get the ring from the perspective of the instance.
+ */
+ public static List<RingInstanceDetails> ring(IInstance inst)
+ {
+ NodeToolResult results = inst.nodetoolResult("ring");
+ results.asserts().success();
+ return parseRing(results.getStdout());
+ }
+
+ /**
+ * Make sure the target instance is in the ring.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance expected in the ring
+ * @return the ring (if target is present)
+ */
+ public static List<RingInstanceDetails> assertInRing(IInstance instance,
IInstance expectedInRing)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ Optional<RingInstanceDetails> match = ring.stream().filter(d ->
d.address.equals(targetAddress)).findFirst();
+ assertThat(match).as("Not expected to find %s but was found",
targetAddress).isPresent();
+ return ring;
+ }
+
+ /**
+ * Make sure the target instance's gossip state matches on the source
instance
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance expected in the ring
+ * @param state expected gossip state
+ * @return the ring (if target is present and has expected state)
+ */
+ public static List<RingInstanceDetails> assertRingState(IInstance
instance, IInstance expectedInRing, String state)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ List<RingInstanceDetails> match = ring.stream()
+ .filter(d ->
d.address.equals(targetAddress))
+ .collect(Collectors.toList());
+ assertThat(match)
+ .isNotEmpty()
+ .as("State was expected to be %s but was not", state)
+ .anyMatch(r -> r.state.equals(state));
+ return ring;
+ }
+
+ /**
+ * Make sure the target instance is NOT in the ring.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance not expected in the ring
+ * @return the ring (if target is not present)
+ */
+ public static List<RingInstanceDetails> assertNotInRing(IInstance
instance, IInstance expectedInRing)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ Optional<RingInstanceDetails> match = ring.stream().filter(d ->
d.address.equals(targetAddress)).findFirst();
+ Assert.assertEquals("Not expected to find " + targetAddress + " but
was found", Optional.empty(), match);
+ return ring;
+ }
+
+ private static List<RingInstanceDetails> awaitRing(IInstance src, String
errorMessage, Predicate<List<RingInstanceDetails>> fn)
+ {
+ List<RingInstanceDetails> ring = null;
+ for (int i = 0; i < 100; i++)
+ {
+ ring = ring(src);
+ if (fn.test(ring))
+ {
+ return ring;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ throw new AssertionError(errorMessage + "\n" + ring);
+ }
+
+ /**
+ * Wait for the target to be in the ring as seen by the source instance.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance to wait for
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingJoin(IInstance instance,
IInstance expectedInRing)
+ {
+ return awaitRingJoin(instance,
expectedInRing.broadcastAddress().getAddress().getHostAddress());
+ }
+
+ /**
+ * Wait for the target to be in the ring as seen by the source instance.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance address to wait for
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingJoin(IInstance instance,
String expectedInRing)
+ {
+ return awaitRing(instance, "Node " + expectedInRing + " did not join
the ring...", ring -> {
+ Optional<RingInstanceDetails> match = ring.stream().filter(d ->
d.address.equals(expectedInRing)).findFirst();
+ if (match.isPresent())
+ {
+ RingInstanceDetails details = match.get();
+ return details.status.equals("Up") &&
details.state.equals("Normal");
+ }
+ return false;
+ });
+ }
+
+ /**
+ * Wait for the ring to only have instances that are Up and Normal.
+ *
+ * @param src instance to check on
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingHealthy(IInstance src)
+ {
+ return awaitRing(src, "Timeout waiting for ring to become healthy",
+ ring ->
+
ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy));
+ }
+
+ /**
+ * Wait for the ring to have the target instance with the provided state.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing to look for
+ * @param state expected
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingState(IInstance instance,
IInstance expectedInRing, String state)
+ {
+ return awaitRing(instance, "Timeout waiting for " + expectedInRing + "
to have state " + state,
+ ring ->
+ ring.stream()
+ .filter(d ->
d.address.equals(getBroadcastAddressHostString(expectedInRing)))
+ .filter(d -> d.state.equals(state))
+ .findAny().isPresent());
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance
may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing expected instances in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance,
IInstance... expectedInRing)
+ {
+ return assertRingIs(instance, Arrays.asList(expectedInRing));
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance
may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing expected instances in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance,
Collection<? extends IInstance> expectedInRing)
+ {
+ Set<String> expectedRingAddresses = expectedInRing.stream()
+ .map(i ->
i.config().broadcastAddress().getAddress().getHostAddress())
+
.collect(Collectors.toSet());
+ return assertRingIs(instance, expectedRingAddresses);
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance
may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedRingAddresses expected instances addresses in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance,
Set<String> expectedRingAddresses)
+ {
+ List<RingInstanceDetails> ring = ring(instance);
+ Set<String> ringAddresses = ring.stream().map(d ->
d.address).collect(Collectors.toSet());
+ assertThat(ringAddresses)
+ .as("Ring addreses did not match for instance %s", instance)
+ .isEqualTo(expectedRingAddresses);
+ return ring;
+ }
+
+ private static boolean isRingInstanceDetailsHealthy(RingInstanceDetails
details)
+ {
+ return details.status.equals("Up") && details.state.equals("Normal");
+ }
+
+ private static List<RingInstanceDetails> parseRing(String str)
+ {
+ // 127.0.0.3 rack0 Up Normal 46.21 KB 100.00%
-1
+ // /127.0.0.1:7012 Unknown ? Normal ? 100.00%
-3074457345618258603
+ Pattern pattern =
Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
+ List<RingInstanceDetails> details = new ArrayList<>();
+ String[] lines = str.split("\n");
+ for (String line : lines)
+ {
+ Matcher matcher = pattern.matcher(line);
+ if (!matcher.find())
+ {
+ continue;
+ }
+ details.add(new RingInstanceDetails(matcher.group(1),
matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
+ }
+
+ return details;
+ }
+
+ private static Map<String, Map<String, String>> awaitGossip(IInstance src,
String errorMessage, Predicate<Map<String, Map<String, String>>> fn)
+ {
+ Map<String, Map<String, String>> gossip = null;
+ for (int i = 0; i < 100; i++)
+ {
+ gossip = gossipInfo(src);
+ if (fn.test(gossip))
+ {
+ return gossip;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ throw new AssertionError(errorMessage + "\n" + gossip);
+ }
+
+ /**
+ * Wait for the target instance to have the desired status. Target status
is checked via string contains so works
+ * with 'NORMAL' but also can check tokens or full state.
+ *
+ * @param instance instance to check on
+ * @param expectedInGossip instance to wait for
+ * @param targetStatus for the instance
+ * @return gossip info
+ */
+ public static Map<String, Map<String, String>> awaitGossipStatus(IInstance
instance, IInstance expectedInGossip, String targetStatus)
+ {
+ return awaitGossip(instance, "Node " + expectedInGossip + " did not
match state " + targetStatus, gossip -> {
+ Map<String, String> state =
gossip.get(getBroadcastAddressString(expectedInGossip));
+ if (state == null)
+ return false;
+ String status = state.get("STATUS_WITH_PORT");
+ if (status == null)
+ status = state.get("STATUS");
+ if (status == null)
+ return targetStatus == null;
+ return status.contains(targetStatus);
+ });
+ }
+
+ /**
+ * Get the gossip information from the node. Currently only address,
generation, and heartbeat are returned
+ *
+ * @param inst to check on
+ * @return gossip info
+ */
+ public static Map<String, Map<String, String>> gossipInfo(IInstance inst)
+ {
+ NodeToolResult results = inst.nodetoolResult("gossipinfo");
+ results.asserts().success();
+ return parseGossipInfo(results.getStdout());
+ }
+
+ /**
+ * Make sure the gossip info for the specific target has the expected
generation and heartbeat
+ *
+ * @param instance to check on
+ * @param expectedInGossip instance to check for
+ * @param expectedGeneration expected generation
+ * @param expectedHeartbeat expected heartbeat
+ */
+ public static void assertGossipInfo(IInstance instance,
+ InetSocketAddress expectedInGossip,
int expectedGeneration, int expectedHeartbeat)
+ {
+ String targetAddress = expectedInGossip.getAddress().toString();
+ Map<String, Map<String, String>> gossipInfo = gossipInfo(instance);
+ Map<String, String> gossipState = gossipInfo.get(targetAddress);
+ if (gossipState == null)
+ throw new NullPointerException("Unable to find gossip info for " +
targetAddress + "; gossip info = " + gossipInfo);
+ Assert.assertEquals(Long.toString(expectedGeneration),
gossipState.get("generation"));
+ Assert.assertEquals(Long.toString(expectedHeartbeat),
gossipState.get("heartbeat")); //TODO do we really mix these two?
+ }
+
+ private static Map<String, Map<String, String>> parseGossipInfo(String str)
+ {
+ Map<String, Map<String, String>> map = new HashMap<>();
+ String[] lines = str.split("\n");
+ String currentInstance = null;
+ for (String line : lines)
+ {
+ if (line.startsWith("/"))
+ {
+ // start of new instance
+ currentInstance = line;
+ continue;
+ }
+ Objects.requireNonNull(currentInstance);
+ String[] kv = line.trim().split(":", 2);
+ assert kv.length == 2 : "When splitting line '" + line + "'
expected 2 parts but not true";
+ Map<String, String> state = map.computeIfAbsent(currentInstance,
ignore -> new HashMap<>());
+ state.put(kv[0], kv[1]);
+ }
+
+ return map;
+ }
+
+ /**
+ * Get the tokens assigned to the instance via config. This method does
not work if the instance has learned
+ * or generated its tokens.
+ *
+ * @param instance to get tokens from
+ * @return non-empty list of tokens
+ */
+ public static List<String> getTokens(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ int numTokens = conf.getInt("num_tokens");
+ Assert.assertEquals("Only single token is supported", 1, numTokens);
+ String token = conf.getString("initial_token");
+ Assert.assertNotNull("initial_token was not found", token);
+ return Arrays.asList(token);
+ }
+
+ /**
+ * Get all data directories for the given instance.
+ *
+ * @param instance to get data directories for
+ * @return data directories
+ */
+ public static List<File> getDataDirectories(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String[] ds = (String[]) conf.get("data_file_directories");
+ List<File> files = new ArrayList<>(ds.length);
+ for (int i = 0; i < ds.length; i++)
+ files.add(new File(ds[i]));
+ return files;
+ }
+
+ /**
+ * Get the commit log directory for the given instance.
+ *
+ * @param instance to get the commit log directory for
+ * @return commit log directory
+ */
+ public static File getCommitLogDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("commitlog_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get the hints directory for the given instance.
+ *
+ * @param instance to get the hints directory for
+ * @return hints directory
+ */
+ public static File getHintsDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("hints_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get the saved caches directory for the given instance.
+ *
+ * @param instance to get the saved caches directory for
+ * @return saved caches directory
+ */
+ public static File getSavedCachesDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("saved_caches_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get all writable directories for the given instance.
+ *
+ * @param instance to get directories for
+ * @return all writable directories
+ */
+ public static List<File> getDirectories(IInstance instance)
+ {
+ List<File> out = new ArrayList<>();
+ out.addAll(getDataDirectories(instance));
+ out.add(getCommitLogDirectory(instance));
+ out.add(getHintsDirectory(instance));
+ out.add(getSavedCachesDirectory(instance));
+ return out;
+ }
+
+ /**
+ * Gets the name of the Partitioner for the given instance.
+ *
+ * @param instance to get partitioner from
+ * @return partitioner name
+ */
+ public static String getPartitionerName(IInstance instance)
+ {
+ return (String) instance.config().get("partitioner");
+ }
+
+ /**
+ * Changes the instance's address to the new address. This method should
only be called while the instance is
+ * down, else has undefined behavior.
+ *
+ * @param instance to update address for
+ * @param address to set
+ */
+ public static void updateAddress(IInstance instance, String address)
+ {
+ updateAddress(instance.config(), address);
+ }
+
+ /**
+ * Changes the instance's address to the new address. This method should
only be called while the instance is
+ * down, else has undefined behavior.
+ *
+ * @param conf to update address for
+ * @param address to set
+ */
+ private static void updateAddress(IInstanceConfig conf, String address)
+ {
+ for (String key : Arrays.asList("broadcast_address", "listen_address",
"broadcast_rpc_address", "rpc_address"))
+ conf.set(key, address);
+
+ // InstanceConfig caches InetSocketAddress -> InetAddressAndPort
+ // this causes issues as startup now ignores config, so force reset it
to pull from conf.
+ ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove
the need to null out the cache...
+ conf.networkTopology().put(conf.broadcastAddress(),
NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+ }
+
+ /**
+ * Get the broadcast address host address only (ex. 127.0.0.1)
+ */
+ private static String getBroadcastAddressHostString(IInstance target)
+ {
+ return
target.config().broadcastAddress().getAddress().getHostAddress();
+ }
+
+ /**
+ * Get the broadcast address in host:port format (ex. 127.0.0.1:7190)
+ */
+ public static String getBroadcastAddressHostWithPortString(IInstance
target)
+ {
+ InetSocketAddress address = target.config().broadcastAddress();
+ return address.getAddress().getHostAddress() + ":" + address.getPort();
+ }
+
+ /**
+ * Get the broadcast address InetAddess string (ex. localhost/127.0.0.1 or
/127.0.0.1)
+ */
+ private static String getBroadcastAddressString(IInstance target)
+ {
+ return target.config().broadcastAddress().getAddress().toString();
+ }
+
+ public static final class RingInstanceDetails
+ {
+ private final String address;
+ private final String rack;
+ private final String status;
+ private final String state;
+ private final String token;
+
+ private RingInstanceDetails(String address, String rack, String
status, String state, String token)
+ {
+ this.address = address;
+ this.rack = rack;
+ this.status = status;
+ this.state = state;
+ this.token = token;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public String getRack()
+ {
+ return rack;
+ }
+
+ public String getStatus()
+ {
+ return status;
+ }
+
+ public String getState()
+ {
+ return state;
+ }
+
+ public String getToken()
+ {
+ return token;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RingInstanceDetails that = (RingInstanceDetails) o;
+ return Objects.equals(address, that.address) &&
+ Objects.equals(rack, that.rack) &&
+ Objects.equals(status, that.status) &&
+ Objects.equals(state, that.state) &&
+ Objects.equals(token, that.token);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(address, rack, status, state, token);
+ }
+
+ public String toString()
+ {
+ return Arrays.asList(address, rack, status, state,
token).toString();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
b/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
similarity index 82%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
index a1047b6..898631f 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Isolated.java
@@ -24,14 +24,16 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Tells jvm-dtest that a class should be shared accross all {@link
ClassLoader}s.
+ * Tells jvm-dtest that a class should be isolated and loaded into the
instance class loader.
*
* Jvm-dtest relies on classloader isolation to run multiple cassandra
instances in the same JVM, this makes it
* so some classes do not get shared (outside a blesssed set of
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class
accross all class loaders.
+ * is not desirable, this annotation will tell jvm-dtest to isolate the class
accross all class loaders.
+ *
+ * This is the oposite of {@link Shared}.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
-public @interface Shared
+public @interface Isolated
{
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
index a1047b6..bb67070 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
@@ -29,6 +29,8 @@ import java.lang.annotation.Target;
* Jvm-dtest relies on classloader isolation to run multiple cassandra
instances in the same JVM, this makes it
* so some classes do not get shared (outside a blesssed set of
classes/packages). When the default behavior
* is not desirable, this annotation will tell jvm-dtest to share the class
accross all class loaders.
+ *
+ * This is the oposite of {@link Isolated}.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE })
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
new file mode 100644
index 0000000..88987c2
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/shared/WithProperties.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+
+public final class WithProperties implements AutoCloseable
+{
+ private final List<Property> properties = new ArrayList<>();
+
+ public WithProperties()
+ {
+ }
+
+ public WithProperties(String... kvs)
+ {
+ with(kvs);
+ }
+
+ public void with(String... kvs)
+ {
+ assert kvs.length % 2 == 0 : "Input must have an even amount of inputs
but given " + kvs.length;
+ for (int i = 0; i <= kvs.length - 2; i = i + 2)
+ {
+ with(kvs[i], kvs[i + 1]);
+ }
+ }
+
+ public void setProperty(String key, String value)
+ {
+ with(key, value);
+ }
+
+ public void set(CassandraRelevantProperties prop, String value)
+ {
+ with(prop.getKey(), value);
+ }
+
+ public void set(CassandraRelevantProperties prop, String... values)
+ {
+ set(prop, Arrays.asList(values));
+ }
+
+ public void set(CassandraRelevantProperties prop, Collection<String>
values)
+ {
+ set(prop, Joiner.on(",").join(values));
+ }
+
+ public void set(CassandraRelevantProperties prop, boolean value)
+ {
+ set(prop, Boolean.toString(value));
+ }
+
+ public void set(CassandraRelevantProperties prop, long value)
+ {
+ set(prop, Long.toString(value));
+ }
+
+ public void with(String key, String value)
+ {
+ String previous = System.setProperty(key, value);
+ properties.add(new Property(key, previous));
+ }
+
+
+ @Override
+ public void close()
+ {
+ Collections.reverse(properties);
+ properties.forEach(s -> {
+ if (s.value == null)
+ System.getProperties().remove(s.key);
+ else
+ System.setProperty(s.key, s.value);
+ });
+ properties.clear();
+ }
+
+ private static final class Property
+ {
+ private final String key;
+ private final String value;
+
+ private Property(String key, String value)
+ {
+ this.key = key;
+ this.value = value;
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java
b/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java
new file mode 100644
index 0000000..de6eb0d
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/IPMembershipTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.tools.ToolRunner;
+import org.assertj.core.api.Assertions;
+
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.getDirectories;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.updateAddress;
+
+public class IPMembershipTest extends TestBaseImpl
+{
+ /**
+ * Port of replace_address_test.py::fail_without_replace_test to jvm-dtest
+ */
+ @Test
+ public void sameIPFailWithoutReplace() throws IOException
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NATIVE_PROTOCOL)
+
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+ .start())
+ {
+ IInvokableInstance nodeToReplace = cluster.get(3);
+
+ ToolRunner.invokeCassandraStress("write", "n=10000", "-schema",
"replication(factor=3)", "-port", "native=9042").assertOnExitCode();
+
+ for (boolean auto_bootstrap : Arrays.asList(true, false))
+ {
+ stopUnchecked(nodeToReplace);
+
getDirectories(nodeToReplace).forEach(FileUtils::deleteRecursive);
+
+ nodeToReplace.config().set("auto_bootstrap", auto_bootstrap);
+
+ Assertions.assertThatThrownBy(() -> nodeToReplace.startup())
+ .hasMessage("A node with address /127.0.0.3:7012
already exists, cancelling join. Use cassandra.replace_address if you want to
replace this node.");
+ }
+ }
+ }
+
+ /**
+ * Tests the behavior if a node restarts with a different IP.
+ */
+ @Test
+ public void startupNewIP() throws IOException, InterruptedException
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NATIVE_PROTOCOL)
+ // disable
DistributedTestSnitch as it tries to query before we setup
+
.set("endpoint_snitch", "org.apache.cassandra.locator.SimpleSnitch"))
+ .start())
+ {
+ IInvokableInstance nodeToReplace = cluster.get(3);
+
+ ToolRunner.invokeCassandraStress("write", "n=10000", "-schema",
"replication(factor=3)", "-port", "native=9042").assertOnExitCode();
+
+ stopUnchecked(nodeToReplace);
+
+ // change the IP of the node
+ updateAddress(nodeToReplace, "127.0.0.4");
+
+ nodeToReplace.startup();
+
+ // gossip takes some time, wait for the other nodes to see this
one updated
+ ClusterUtils.awaitRingJoin(cluster.get(1), "127.0.0.4");
+ ClusterUtils.awaitRingJoin(cluster.get(2), "127.0.0.4");
+
+ Set<String> expected = ImmutableSet.of("127.0.0.1", "127.0.0.2",
"127.0.0.4");
+ cluster.forEach(i -> assertRingIs(i, expected));
+
+ ToolRunner.invokeCassandraStress("read", "n=10000", "no-warmup",
"-port", "native=9042").assertOnExitCode();
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index d53cbd4..801df7d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -23,12 +23,14 @@ import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import com.google.common.collect.ImmutableSet;
+
import org.junit.After;
import org.junit.BeforeClass;
@@ -149,4 +151,17 @@ public class TestBaseImpl extends DistributedTestBase
throw new IllegalArgumentException("Unsupported value type (value is "
+ value + ')');
}
+
+ public static void fixDistributedSchemas(Cluster cluster)
+ {
+ // These keyspaces are under replicated by default, so must be updated
when doing a mulit-node cluster;
+ // else bootstrap will fail with 'Unable to find sufficient sources
for streaming range <range> in keyspace <name>'
+ for (String ks : Arrays.asList("system_auth", "system_traces"))
+ {
+ cluster.schemaChange("ALTER KEYSPACE " + ks + " WITH REPLICATION =
{'class': 'SimpleStrategy', 'replication_factor': " + Math.min(cluster.size(),
3) + "}");
+ }
+
+ // in real live repair is needed in this case, but in the test case it
doesn't matter if the tables loose
+ // anything, so ignoring repair to speed up the tests.
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
similarity index 53%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to
test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
index a1047b6..95fa767 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateAbruptDownedNodeTest.java
@@ -16,22 +16,24 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.distributed.test.hostreplacement;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAbrupt;
/**
- * Tells jvm-dtest that a class should be shared accross all {@link
ClassLoader}s.
+ * If the operator attempts to assassinate the node before replacing it, this
will cause the node to fail to start
+ * as the status is non-normal.
*
- * Jvm-dtest relies on classloader isolation to run multiple cassandra
instances in the same JVM, this makes it
- * so some classes do not get shared (outside a blesssed set of
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class
accross all class loaders.
+ * The node is removed abruptly before assassinate, leaving gossip without an
empty entry.
*/
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.TYPE })
-public @interface Shared
+public class AssassinateAbruptDownedNodeTest extends BaseAssassinatedCase
{
+ @Override
+ void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+ {
+ stopAbrupt(cluster, nodeToRemove);
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
similarity index 53%
copy from test/distributed/org/apache/cassandra/distributed/shared/Shared.java
copy to
test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
index a1047b6..d88f2d9 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/Shared.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinateGracefullNodeTest.java
@@ -16,22 +16,24 @@
* limitations under the License.
*/
-package org.apache.cassandra.distributed.shared;
+package org.apache.cassandra.distributed.test.hostreplacement;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
/**
- * Tells jvm-dtest that a class should be shared accross all {@link
ClassLoader}s.
+ * If the operator attempts to assassinate the node before replacing it, this
will cause the node to fail to start
+ * as the status is non-normal.
*
- * Jvm-dtest relies on classloader isolation to run multiple cassandra
instances in the same JVM, this makes it
- * so some classes do not get shared (outside a blesssed set of
classes/packages). When the default behavior
- * is not desirable, this annotation will tell jvm-dtest to share the class
accross all class loaders.
+ * The node is removed gracefully before assassinate, leaving gossip without
an empty entry.
*/
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.TYPE })
-public @interface Shared
+public class AssassinateGracefullNodeTest extends BaseAssassinatedCase
{
+ @Override
+ void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+ {
+ stopUnchecked(nodeToRemove);
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
new file mode 100644
index 0000000..31a732f
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/AssassinatedEmptyNodeTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.net.InetSocketAddress;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertGossipInfo;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAll;
+
+/**
+ * If the operator attempts to assassinate the node before replacing it, this
will cause the node to fail to start
+ * as the status is non-normal.
+ *
+ * The cluster is put into the "empty" state for the node to remove.
+ */
+public class AssassinatedEmptyNodeTest extends BaseAssassinatedCase
+{
+ // empty state does not include the token metadata, so when assassinate
happens it will fail to find the token
+ @Override
+ protected String expectedMessage(IInvokableInstance nodeToRemove)
+ {
+ return "Could not find tokens for " +
nodeToRemove.config().broadcastAddress() + " to replace";
+ }
+
+ @Override
+ void consume(Cluster cluster, IInvokableInstance nodeToRemove)
+ {
+ IInvokableInstance seed = cluster.get(SEED_NUM);
+ IInvokableInstance peer = cluster.get(PEER_NUM);
+ InetSocketAddress addressToReplace = nodeToRemove.broadcastAddress();
+
+ // now stop all nodes
+ stopAll(cluster);
+
+ // with all nodes down, now start the seed (should be first node)
+ seed.startup();
+ peer.startup();
+
+ // at this point node2 should be known in gossip, but with
generation/version of 0
+ assertGossipInfo(seed, addressToReplace, 0, -1);
+ assertGossipInfo(peer, addressToReplace, 0, -1);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
new file mode 100644
index 0000000..4d4768e
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/BaseAssassinatedCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingState;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitGossipStatus;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.getBroadcastAddressHostWithPortString;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.getTokens;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public abstract class BaseAssassinatedCase extends TestBaseImpl
+{
+ protected static final int SEED_NUM = 1;
+ protected static final int NODE_TO_REMOVE_NUM = 2;
+ protected static final int PEER_NUM = 3;
+
+ abstract void consume(Cluster cluster, IInvokableInstance nodeToRemove);
+
+ protected String expectedMessage(IInvokableInstance nodeToRemove)
+ {
+ return "Cannot replace token " + getTokens(nodeToRemove).get(0) + "
which does not exist!";
+ }
+
+ @Test
+ public void test() throws IOException
+ {
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == 4 || node == 5 ? NODE_TO_REMOVE_NUM : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(SEED_NUM);
+ IInvokableInstance nodeToRemove = cluster.get(NODE_TO_REMOVE_NUM);
+ IInvokableInstance peer = cluster.get(PEER_NUM);
+
+ setupCluster(cluster);
+
+ consume(cluster, nodeToRemove);
+
+ assertRingState(seed, nodeToRemove, "Normal");
+
+ // assassinate the node
+ peer.nodetoolResult("assassinate",
getBroadcastAddressHostWithPortString(nodeToRemove))
+ .asserts().success();
+
+ // wait until the peer sees this assassination
+ awaitGossipStatus(seed, nodeToRemove, "LEFT");
+
+ // allow replacing nodes with the LEFT state, this should fail
since the token isn't in the ring
+ assertThatThrownBy(() ->
+ replaceHostAndStart(cluster, nodeToRemove,
properties -> {
+ // since there are downed nodes its
possible gossip has the downed node with an old schema, so need
+ // this property to allow startup
+ properties.set(BOOTSTRAP_SKIP_SCHEMA_CHECK,
true);
+ // since the bootstrap should fail because
the token, don't wait "too long" on schema as it doesn't
+ // matter for this test
+ properties.set(BOOTSTRAP_SCHEMA_DELAY_MS,
10);
+ }))
+ .hasMessage(expectedMessage(nodeToRemove));
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
new file mode 100644
index 0000000..11a30e5
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementAbruptDownedInstanceTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingState;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAbrupt;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.validateRows;
+
+public class HostReplacementAbruptDownedInstanceTest extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(HostReplacementAbruptDownedInstanceTest.class);
+
+ /**
+ * Can we maybe also test with an abrupt shutdown, that is when the
shutdown state is not broadcast and the node to be replaced is on NORMAL state?
+ */
+ @Test
+ public void hostReplaceAbruptShutdown() throws IOException
+ {
+ int numStartNodes = 3;
+ TokenSupplier even =
TokenSupplier.evenlyDistributedTokens(numStartNodes);
+ try (Cluster cluster = Cluster.build(numStartNodes)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == (numStartNodes + 1) ? 2 : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+ IInvokableInstance peer = cluster.get(3);
+ List<IInvokableInstance> peers = Arrays.asList(seed, peer);
+
+ setupCluster(cluster);
+
+ // collect rows/tokens to detect issues later on if the state
doesn't match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+
+ stopAbrupt(cluster, nodeToRemove);
+
+ // at this point node 2 should still be NORMAL on all other nodes
+ peers.forEach(p -> assertRingState(p, nodeToRemove, "Normal"));
+
+ // node is down, but queries should still work
+ //TODO failing, but shouldn't!
+// peers.forEach(p -> validateRows(p.coordinator(), expectedState));
+
+ // now create a new node to replace the other node
+ long startNanos = System.nanoTime();
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove, properties -> {
+ // since node2 was killed abruptly its possible that node2's
gossip state has an old schema version
+ // if this happens then bootstrap will fail waiting for a
schema version it will never see; to avoid
+ // this, setting this property to log the warning rather than
fail bootstrap
+ properties.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+ });
+ logger.info("Host replacement of {} with {} took {}",
nodeToRemove, replacingNode, Duration.ofNanos(System.nanoTime() - startNanos));
+ peers.forEach(p -> awaitRingJoin(p, replacingNode));
+
+ // make sure all nodes are healthy
+ awaitRingHealthy(seed);
+
+ List<IInvokableInstance> expectedRing = Arrays.asList(seed, peer,
replacingNode);
+ expectedRing.forEach(p -> assertRingIs(p, expectedRing));
+
+ expectedRing.forEach(p -> validateRows(p.coordinator(),
expectedState));
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
new file mode 100644
index 0000000..477e226
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementOfDownedClusterTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.assertj.core.api.Assertions;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertGossipInfo;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertNotInRing;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.stopAll;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.setupCluster;
+import static
org.apache.cassandra.distributed.test.hostreplacement.HostReplacementTest.validateRows;
+
+public class HostReplacementOfDownedClusterTest extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(HostReplacementOfDownedClusterTest.class);
+
+ static
+ {
+ // Gossip has a notion of quarantine, which is used to remove "fat
clients" and "gossip only members"
+ // from the ring if not updated recently (recently is defined by this
config).
+ // The reason for setting to 0 is to make sure even under such an
aggressive environment, we do NOT remove
+ // nodes from the peers table
+ GOSSIPER_QUARANTINE_DELAY.setInt(0);
+ }
+
+ /**
+ * When the full cluster crashes, make sure that we can replace a dead
node after recovery. This can happen
+ * with DC outages (assuming single DC setup) where the recovery isn't
able to recover a specific node.
+ */
+ @Test
+ public void hostReplacementOfDeadNode() throws IOException
+ {
+ // start with 2 nodes, stop both nodes, start the seed, host replace
the down node)
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+ try (Cluster cluster = Cluster.build(2)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == 3 ? 2 : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+ InetSocketAddress addressToReplace =
nodeToRemove.broadcastAddress();
+
+ setupCluster(cluster);
+
+ // collect rows/tokens to detect issues later on if the state
doesn't match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+ List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+ // now stop all nodes
+ stopAll(cluster);
+
+ // with all nodes down, now start the seed (should be first node)
+ seed.startup();
+
+ // at this point node2 should be known in gossip, but with
generation/version of 0
+ assertGossipInfo(seed, addressToReplace, 0, -1);
+
+ // make sure node1 still has node2's tokens
+ List<String> currentTokens = getTokenMetadataTokens(seed);
+ Assertions.assertThat(currentTokens)
+ .as("Tokens no longer match after restarting")
+ .isEqualTo(beforeCrashTokens);
+
+ // now create a new node to replace the other node
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove);
+
+ awaitRingJoin(seed, replacingNode);
+ awaitRingJoin(replacingNode, seed);
+ assertNotInRing(seed, nodeToRemove);
+ logger.info("Current ring is {}", assertNotInRing(replacingNode,
nodeToRemove));
+
+ validateRows(seed.coordinator(), expectedState);
+ validateRows(replacingNode.coordinator(), expectedState);
+ }
+ }
+
+ /**
+ * Cluster stops completely, then start seed, then host replace node2;
after all complete start node3 to make sure
+ * it comes up correctly with the new host in the ring.
+ */
+ @Test
+ public void hostReplacementOfDeadNodeAndOtherNodeStartsAfter() throws
IOException
+ {
+ // start with 3 nodes, stop both nodes, start the seed, host replace
the down node)
+ int numStartNodes = 3;
+ TokenSupplier even =
TokenSupplier.evenlyDistributedTokens(numStartNodes);
+ try (Cluster cluster = Cluster.build(numStartNodes)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == (numStartNodes + 1) ? 2 : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+ IInvokableInstance nodeToStartAfterReplace = cluster.get(3);
+ InetSocketAddress addressToReplace =
nodeToRemove.broadcastAddress();
+
+ setupCluster(cluster);
+
+ // collect rows/tokens to detect issues later on if the state
doesn't match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+ List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+ // now stop all nodes
+ stopAll(cluster);
+
+ // with all nodes down, now start the seed (should be first node)
+ seed.startup();
+
+ // at this point node2 should be known in gossip, but with
generation/version of 0
+ assertGossipInfo(seed, addressToReplace, 0, -1);
+
+ // make sure node1 still has node2's tokens
+ List<String> currentTokens = getTokenMetadataTokens(seed);
+ Assertions.assertThat(currentTokens)
+ .as("Tokens no longer match after restarting")
+ .isEqualTo(beforeCrashTokens);
+
+ // now create a new node to replace the other node
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove);
+
+ // wait till the replacing node is in the ring
+ awaitRingJoin(seed, replacingNode);
+ awaitRingJoin(replacingNode, seed);
+
+ // we see that the replaced node is properly in the ring, now lets
add the other node back
+ nodeToStartAfterReplace.startup();
+
+ awaitRingJoin(seed, nodeToStartAfterReplace);
+ awaitRingJoin(replacingNode, nodeToStartAfterReplace);
+
+ // make sure all nodes are healthy
+ awaitRingHealthy(seed);
+
+ assertRingIs(seed, seed, replacingNode, nodeToStartAfterReplace);
+ assertRingIs(replacingNode, seed, replacingNode,
nodeToStartAfterReplace);
+ logger.info("Current ring is {}",
assertRingIs(nodeToStartAfterReplace, seed, replacingNode,
nodeToStartAfterReplace));
+
+ validateRows(seed.coordinator(), expectedState);
+ validateRows(replacingNode.coordinator(), expectedState);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
new file mode 100644
index 0000000..3de0bf5
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/HostReplacementTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.hostreplacement;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.assertj.core.api.Assertions;
+
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.GOSSIPER_QUARANTINE_DELAY;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertInRing;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.assertRingIs;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingHealthy;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.getTokenMetadataTokens;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+
+public class HostReplacementTest extends TestBaseImpl
+{
+ private static final Logger logger =
LoggerFactory.getLogger(HostReplacementTest.class);
+
+ static
+ {
+ // Gossip has a notiion of quarantine, which is used to remove "fat
clients" and "gossip only members"
+ // from the ring if not updated recently (recently is defined by this
config).
+ // The reason for setting to 0 is to make sure even under such an
aggressive environment, we do NOT remove
+ // nodes from the peers table
+ GOSSIPER_QUARANTINE_DELAY.setInt(0);
+ }
+
+ /**
+ * Attempt to do a host replacement on a down host
+ */
+ @Test
+ public void replaceDownedHost() throws IOException
+ {
+ // start with 2 nodes, stop both nodes, start the seed, host replace
the down node)
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+ try (Cluster cluster = Cluster.build(2)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == 3 ? 2 : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+
+ setupCluster(cluster);
+
+ // collect rows to detect issues later on if the state doesn't
match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+
+ stopUnchecked(nodeToRemove);
+
+ // now create a new node to replace the other node
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove, props -> {
+ // since we have a downed host there might be a schema version
which is old show up but
+ // can't be fetched since the host is down...
+ props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+ });
+
+ // wait till the replacing node is in the ring
+ awaitRingJoin(seed, replacingNode);
+ awaitRingJoin(replacingNode, seed);
+
+ // make sure all nodes are healthy
+ awaitRingHealthy(seed);
+
+ assertRingIs(seed, seed, replacingNode);
+ logger.info("Current ring is {}", assertRingIs(replacingNode,
seed, replacingNode));
+
+ validateRows(seed.coordinator(), expectedState);
+ validateRows(replacingNode.coordinator(), expectedState);
+ }
+ }
+
+ /**
+ * Attempt to do a host replacement on a alive host
+ */
+ @Test
+ public void replaceAliveHost() throws IOException
+ {
+ // start with 2 nodes, stop both nodes, start the seed, host replace
the down node)
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(2);
+ try (Cluster cluster = Cluster.build(2)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK)
+
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+ .withTokenSupplier(node ->
even.token(node == 3 ? 2 : node))
+ .start())
+ {
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+
+ setupCluster(cluster);
+
+ // collect rows to detect issues later on if the state doesn't
match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+
+ // now create a new node to replace the other node
+ Assertions.assertThatThrownBy(() -> replaceHostAndStart(cluster,
nodeToRemove))
+ .as("Startup of instance should have failed as you can
not replace a alive node")
+ .hasMessageContaining("Cannot replace a live node")
+ .isInstanceOf(UnsupportedOperationException.class);
+
+ // make sure all nodes are healthy
+ awaitRingHealthy(seed);
+
+ assertRingIs(seed, seed, nodeToRemove);
+ logger.info("Current ring is {}", assertRingIs(nodeToRemove, seed,
nodeToRemove));
+
+ validateRows(seed.coordinator(), expectedState);
+ validateRows(nodeToRemove.coordinator(), expectedState);
+ }
+ }
+
+ /**
+ * If the seed goes down, then another node, once the seed comes back,
make sure host replacements still work.
+ */
+ @Test
+ public void seedGoesDownBeforeDownHost() throws IOException
+ {
+ // start with 3 nodes, stop both nodes, start the seed, host replace
the down node)
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP,
Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == 4 ? 2 : node))
+ .start())
+ {
+ // call early as this can't be touched on a down node
+ IInvokableInstance seed = cluster.get(1);
+ IInvokableInstance nodeToRemove = cluster.get(2);
+ IInvokableInstance nodeToStayAlive = cluster.get(3);
+
+ setupCluster(cluster);
+
+ // collect rows/tokens to detect issues later on if the state
doesn't match
+ SimpleQueryResult expectedState =
nodeToRemove.coordinator().executeWithResult("SELECT * FROM " + KEYSPACE +
".tbl", ConsistencyLevel.ALL);
+ List<String> beforeCrashTokens = getTokenMetadataTokens(seed);
+
+ // shutdown the seed, then the node to remove
+ stopUnchecked(seed);
+ stopUnchecked(nodeToRemove);
+
+ // restart the seed
+ seed.startup();
+
+ // make sure the node to remove is still in the ring
+ assertInRing(seed, nodeToRemove);
+
+ // make sure node1 still has node2's tokens
+ List<String> currentTokens = getTokenMetadataTokens(seed);
+ Assertions.assertThat(currentTokens)
+ .as("Tokens no longer match after restarting")
+ .isEqualTo(beforeCrashTokens);
+
+ // now create a new node to replace the other node
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
nodeToRemove);
+
+ List<IInvokableInstance> expectedRing = Arrays.asList(seed,
replacingNode, nodeToStayAlive);
+
+ // wait till the replacing node is in the ring
+ awaitRingJoin(seed, replacingNode);
+ awaitRingJoin(replacingNode, seed);
+ awaitRingJoin(nodeToStayAlive, replacingNode);
+
+ // make sure all nodes are healthy
+ logger.info("Current ring is {}", awaitRingHealthy(seed));
+
+ expectedRing.forEach(i -> assertRingIs(i, expectedRing));
+
+ validateRows(seed.coordinator(), expectedState);
+ validateRows(replacingNode.coordinator(), expectedState);
+ }
+ }
+
+ static void setupCluster(Cluster cluster)
+ {
+ fixDistributedSchemas(cluster);
+ init(cluster);
+
+ populate(cluster);
+ cluster.forEach(i -> i.flush(KEYSPACE));
+ }
+
+ static void populate(Cluster cluster)
+ {
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl
(pk int PRIMARY KEY)");
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl
(pk) VALUES (?)",
+ ConsistencyLevel.ALL,
+ i);
+ }
+ }
+
+ static void validateRows(ICoordinator coordinator, SimpleQueryResult
expected)
+ {
+ expected.reset();
+ SimpleQueryResult rows = coordinator.executeWithResult("SELECT * FROM
" + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+ AssertUtils.assertRows(rows, expected);
+ }
+}
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 367f6a1..1b17a27 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -83,19 +83,19 @@ public class GossiperTest
public void testHaveVersion3Nodes() throws Exception
{
VersionedValue.VersionedValueFactory factory = new
VersionedValue.VersionedValueFactory(null);
- EndpointState es = new EndpointState(null);
+ EndpointState es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION,
factory.releaseVersion("4.0-SNAPSHOT"));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"),
es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
- es = new EndpointState(null);
+ es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION,
factory.releaseVersion("3.11.3"));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"),
es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
- es = new EndpointState(null);
+ es = new EndpointState((HeartBeatState) null);
es.addApplicationState(ApplicationState.RELEASE_VERSION,
factory.releaseVersion("3.0.0"));
Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"),
es);
Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]