Add shutdown gossip state to prevent timeouts during rolling restarts Patch by brandonwilliams, reviewed by Richard Low for CASSANDRA-8336
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2c62bb2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2c62bb2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2c62bb2 Branch: refs/heads/cassandra-2.1 Commit: b2c62bb20be52a698a5683ef8ffcdefe560dbc9a Parents: 53848f7 Author: Brandon Williams <[email protected]> Authored: Wed Apr 15 09:30:12 2015 -0500 Committer: Brandon Williams <[email protected]> Committed: Wed Apr 15 09:30:12 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../gms/GossipShutdownVerbHandler.java | 2 +- src/java/org/apache/cassandra/gms/Gossiper.java | 86 ++++++++++++++++++-- .../apache/cassandra/gms/HeartBeatState.java | 5 ++ .../apache/cassandra/gms/VersionedValue.java | 6 ++ .../cassandra/service/StorageService.java | 17 ++-- 6 files changed, 102 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 521668d..460b07c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.15: + * Add shutdown gossip state to prevent timeouts during rolling restarts (CASSANDRA-8336) * Fix running with java.net.preferIPv6Addresses=true (CASSANDRA-9137) * Fix failed bootstrap/replace attempts being persisted in system.peers (CASSANDRA-9180) * Flush system.IndexInfo after marking index built (CASSANDRA-9128) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java index ef71208..1691107 100644 --- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java @@ -34,7 +34,7 @@ public class GossipShutdownVerbHandler implements IVerbHandler logger.debug("Ignoring shutdown message from {} because gossip is disabled", message.from); return; } - FailureDetector.instance.forceConviction(message.from); + Gossiper.instance.markAsShutdown(message.from); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 962a358..090033e 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -71,6 +71,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean static final ApplicationState[] STATES = ApplicationState.values(); static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE); + static List<String> SILENT_SHUTDOWN_STATES = DEAD_STATES; + static { + SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING); + SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_LEFT); + } private ScheduledFuture<?> scheduledGossipTask; private static final ReentrantLock taskLock = new ReentrantLock(); @@ -297,6 +302,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return 0L; } + private boolean isShutdown(InetAddress endpoint) + { + EndpointState epState = endpointStateMap.get(endpoint); + if (epState == null) + return false; + if (epState.getApplicationState(ApplicationState.STATUS) == null) + return false; + String value = epState.getApplicationState(ApplicationState.STATUS).value; + String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + String state = pieces[0]; + return state.equals(VersionedValue.SHUTDOWN); + } + /** * This method is part of IFailureDetectionEventListener interface. This is invoked * by the Failure Detector when it convicts an end point. @@ -308,7 +327,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean EndpointState epState = endpointStateMap.get(endpoint); if (epState == null) return; - if (epState.isAlive() && !isDeadState(epState)) + if (isShutdown(endpoint) && epState.isAlive()) + { + markAsShutdown(endpoint); + } + else if (epState.isAlive() && !isDeadState(epState)) { markDead(endpoint, epState); } @@ -317,6 +340,21 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } /** + * This method is used to mark a node as shutdown; that is it gracefully exited on its own and told us about it + * @param endpoint endpoint that has shut itself down + */ + protected void markAsShutdown(InetAddress endpoint) + { + EndpointState epState = endpointStateMap.get(endpoint); + if (epState == null) + return; + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); + epState.getHeartBeatState().forceHighestPossibleVersionUnsafe(); + markDead(endpoint, epState); + FailureDetector.instance.forceConviction(endpoint); + } + + /** * Return either: the greatest heartbeat or application state * * @param epState @@ -963,6 +1001,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean } for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onJoin(ep, epState); + // check this at the end so nodes will learn about the endpoint + if (isShutdown(ep)) + markAsShutdown(ep); } public boolean isDeadState(EndpointState epState) @@ -981,6 +1022,22 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return false; } + public boolean isSilentShutdownState(EndpointState epState) + { + if (epState.getApplicationState(ApplicationState.STATUS) == null) + return false; + String value = epState.getApplicationState(ApplicationState.STATUS).value; + String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + String state = pieces[0]; + for (String deadstate : SILENT_SHUTDOWN_STATES) + { + if (state.equals(deadstate)) + return true; + } + return false; + } + void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) { for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) @@ -1264,6 +1321,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState); } + public void forceNewerGeneration() + { + EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + epstate.getHeartBeatState().forceNewerGenerationUnsafe(); + } + /** * Add an endpoint we knew about previously, but whose state is unknown @@ -1330,13 +1393,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void stop() { - if (scheduledGossipTask != null) - scheduledGossipTask.cancel(false); - logger.info("Announcing shutdown"); - Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS); - MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN); - for (InetAddress ep : liveEndpoints) - MessagingService.instance().sendOneWay(message, ep); + EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddress()); + if (mystate != null && !isSilentShutdownState(mystate)) + { + logger.info("Announcing shutdown"); + addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); + MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN); + for (InetAddress ep : liveEndpoints) + MessagingService.instance().sendOneWay(message, ep); + Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000), TimeUnit.MILLISECONDS); + } + else + logger.warn("No local state or state is in silent shutdown, not announcing shutdown"); + if (scheduledGossipTask != null) + scheduledGossipTask.cancel(false); } public boolean isEnabled() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/HeartBeatState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java index c3b423c..4af5dd8 100644 --- a/src/java/org/apache/cassandra/gms/HeartBeatState.java +++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java @@ -63,6 +63,11 @@ class HeartBeatState generation += 1; } + void forceHighestPossibleVersionUnsafe() + { + version = Integer.MAX_VALUE; + } + public String toString() { return String.format("HeartBeat: generation = %d, version = %d", generation, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 565a8cb..b0918ac 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -69,6 +69,7 @@ public class VersionedValue implements Comparable<VersionedValue> public final static String REMOVED_TOKEN = "removed"; public final static String HIBERNATE = "hibernate"; + public final static String SHUTDOWN = "shutdown"; // values for ApplicationState.REMOVAL_COORDINATOR public final static String REMOVAL_COORDINATOR = "REMOVER"; @@ -207,6 +208,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value); } + public VersionedValue shutdown(boolean value) + { + return new VersionedValue(VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + value); + } + public VersionedValue datacenter(String dcId) { return new VersionedValue(dcId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e906f03..077413f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -37,12 +37,10 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; -import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.cql3.CQL3Type; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -205,11 +203,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE SystemKeyspace.updateTokens(tokens); tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress()); Collection<Token> localTokens = getLocalTokens(); + setGossipTokens(localTokens); + setMode(Mode.NORMAL, false); + } + + public void setGossipTokens(Collection<Token> tokens) + { List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>(); - states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(localTokens))); - states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(localTokens))); + states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); + states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens))); Gossiper.instance.addLocalApplicationStates(states); - setMode(Mode.NORMAL, false); } public StorageService() @@ -289,6 +292,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (!initialized) { logger.warn("Starting gossip by operator request"); + setGossipTokens(getLocalTokens()); + Gossiper.instance.forceNewerGeneration(); Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); initialized = true; } @@ -1346,7 +1351,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING)) handleStateBootstrap(endpoint, pieces); - else if (moveName.equals(VersionedValue.STATUS_NORMAL)) + else if (moveName.equals(VersionedValue.STATUS_NORMAL) || moveName.equals(VersionedValue.SHUTDOWN)) handleStateNormal(endpoint, pieces); else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) handleStateRemoving(endpoint, pieces);
