Author: brandonwilliams Date: Mon Jul 25 18:58:27 2011 New Revision: 1150847
URL: http://svn.apache.org/viewvc?rev=1150847&view=rev Log: Gossip handles dead states, token removal actually works, gossip states are held for aVeryLongTime. Patch by brandonwilliams and Paul Cannon, reviewed by Paul Cannon for CASSANDRA-2496. Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/NEWS.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:58:27 2011 @@ -1,6 +1,7 @@ 0.8.3 * add ability to drop local reads/writes that are going to timeout (CASSANDRA-2943) + * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946) 0.8.2 Modified: cassandra/branches/cassandra-0.8/NEWS.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/NEWS.txt?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/NEWS.txt (original) +++ cassandra/branches/cassandra-0.8/NEWS.txt Mon Jul 25 18:58:27 2011 @@ -1,3 +1,12 @@ +0.8.3 +===== + +Upgrading +--------- + - Token removal has been revamped. Removing tokens in a mixed cluster with + 0.8.3 will not work, so the entire cluster will need to be running 0.8.3 + first, except for the dead node. + 0.8.2 ===== Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java Mon Jul 25 18:58:27 2011 @@ -29,6 +29,7 @@ public enum ApplicationState DC, RACK, RELEASE_VERSION, + REMOVAL_COORDINATOR, // pad to allow adding new states to existing cluster X1, X2, Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jul 25 18:58:27 2011 @@ -27,6 +27,7 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.db.SystemTable; import org.apache.cassandra.net.MessageProducer; import org.apache.cassandra.config.ConfigurationException; @@ -58,6 +59,8 @@ public class Gossiper implements IFailur private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks"); static final ApplicationState[] STATES = ApplicationState.values(); + static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT); + private ScheduledFuture<?> scheduledGossipTask; public final static int intervalInMillis = 1000; public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2; @@ -264,17 +267,21 @@ public class Gossiper implements IFailur } /** - * Removes the endpoint from unreachable endpoint set + * Removes the endpoint from gossip completely * * @param endpoint endpoint to be removed from the current membership. */ private void evictFromMembership(InetAddress endpoint) { unreachableEndpoints.remove(endpoint); + endpointStateMap.remove(endpoint); + justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + if (logger.isDebugEnabled()) + logger.debug("evicting " + endpoint + " from gossip"); } /** - * Removes the endpoint completely from Gossip + * Removes the endpoint from Gossip but retains endpoint state */ public void removeEndpoint(InetAddress endpoint) { @@ -288,6 +295,8 @@ public class Gossiper implements IFailur FailureDetector.instance.remove(endpoint); versions.remove(endpoint); justRemovedEndpoints.put(endpoint, System.currentTimeMillis()); + if (logger.isDebugEnabled()) + logger.debug("removing endpoint " + endpoint); } /** @@ -328,6 +337,67 @@ public class Gossiper implements IFailur } } + /** + * This method will begin removing an existing endpoint from the cluster by spoofing its state + * This should never be called unless this coordinator has had 'removetoken' invoked + * + * @param endpoint - the endpoint being removed + * @param token - the token being removed + * @param mytoken - my own token for replication coordination + */ + public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken) + { + EndpointState epState = endpointStateMap.get(endpoint); + // remember this node's generation + int generation = epState.getHeartBeatState().getGeneration(); + logger.info("Removing token: " + token); + logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change"); + try + { + Thread.sleep(StorageService.RING_DELAY); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + // make sure it did not change + epState = endpointStateMap.get(endpoint); + if (epState.getHeartBeatState().getGeneration() != generation) + throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it"); + // update the other node's generation to mimic it as if it had changed it itself + logger.info("Advertising removal for " + endpoint); + epState.updateTimestamp(); // make sure we don't evict it too soon + epState.getHeartBeatState().forceNewerGenerationUnsafe(); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token)); + epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken)); + endpointStateMap.put(endpoint, epState); + } + + /** + * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN + * This should only be called after advertiseRemoving + * @param endpoint + * @param token + */ + public void advertiseTokenRemoved(InetAddress endpoint, Token token) + { + EndpointState epState = endpointStateMap.get(endpoint); + epState.updateTimestamp(); // make sure we don't evict it too soon + epState.getHeartBeatState().forceNewerGenerationUnsafe(); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token)); + logger.info("Completing removal of " + endpoint); + endpointStateMap.put(endpoint, epState); + // ensure at least one gossip round occurs before returning + try + { + Thread.sleep(intervalInMillis * 2); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + } + public boolean isKnownEndpoint(InetAddress endpoint) { return endpointStateMap.containsKey(endpoint); @@ -456,23 +526,18 @@ public class Gossiper implements IFailur { long duration = now - epState.getUpdateTimestamp(); + if (StorageService.instance.getTokenMetadata().isMember(endpoint)) + epState.setHasToken(true); // check if this is a fat client. fat clients are removed automatically from // gosip after FatClientTimeout - if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout)) + if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout)) { - if (StorageService.instance.getTokenMetadata().isMember(endpoint)) - epState.setHasToken(true); - else - { - if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client - { - logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); - removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state - } - } + logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip"); + removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay + evictFromMembership(endpoint); // can get rid of the state immediately } - if ( !epState.isAlive() && (duration > aVeryLongTime) ) + if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint))) { evictFromMembership(endpoint); } @@ -488,7 +553,6 @@ public class Gossiper implements IFailur if (logger.isDebugEnabled()) logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over"); justRemovedEndpoints.remove(entry.getKey()); - endpointStateMap.remove(entry.getKey()); } } } @@ -585,6 +649,7 @@ public class Gossiper implements IFailur int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration(); if ( remoteGeneration > localGeneration ) { + localEndpointState.updateTimestamp(); fd.report(endpoint); return; } @@ -595,6 +660,7 @@ public class Gossiper implements IFailur int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion(); if ( remoteVersion > localVersion ) { + localEndpointState.updateTimestamp(); fd.report(endpoint); } } @@ -607,6 +673,7 @@ public class Gossiper implements IFailur if (logger.isTraceEnabled()) logger.trace("marking as alive {}", addr); localState.markAlive(); + localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime liveEndpoints.add(addr); unreachableEndpoints.remove(addr); logger.info("InetAddress {} is now UP", addr); @@ -638,10 +705,13 @@ public class Gossiper implements IFailur */ private void handleMajorStateChange(InetAddress ep, EndpointState epState) { - if (endpointStateMap.get(ep) != null) - logger.info("Node {} has restarted, now UP again", ep); - else - logger.info("Node {} is now part of the cluster", ep); + if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value)) + { + if (endpointStateMap.get(ep) != null) + logger.info("Node {} has restarted, now UP again", ep); + else + logger.info("Node {} is now part of the cluster", ep); + } if (logger.isTraceEnabled()) logger.trace("Adding endpoint state for " + ep); endpointStateMap.put(ep, epState); @@ -651,11 +721,31 @@ public class Gossiper implements IFailur for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onDead(ep, epState); } - markAlive(ep, epState); + if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value)) + markAlive(ep, epState); + else + { + logger.debug("Not marking " + ep + " alive due to dead state"); + epState.markDead(); + epState.setHasToken(true); // fat clients won't have a dead state + } for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onJoin(ep, epState); } + private Boolean isDeadState(String value) + { + String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + String state = pieces[0]; + for (String deadstate : DEAD_STATES) + { + if (state.equals(deadstate)) + return true; + } + return false; + } + void applyStateLocally(Map<InetAddress, EndpointState> epStateMap) { for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet()) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java Mon Jul 25 18:58:27 2011 @@ -71,6 +71,11 @@ class HeartBeatState { return version; } + + void forceNewerGenerationUnsafe() + { + generation += 1; + } } class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState> Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java Mon Jul 25 18:58:27 2011 @@ -49,7 +49,7 @@ public class VersionedValue implements C public final static char DELIMITER = ','; public final static String DELIMITER_STR = new String(new char[] { DELIMITER }); - // values for State.STATUS + // values for ApplicationState.STATUS public final static String STATUS_BOOTSTRAPPING = "BOOT"; public final static String STATUS_NORMAL = "NORMAL"; public final static String STATUS_LEAVING = "LEAVING"; @@ -59,6 +59,9 @@ public class VersionedValue implements C public final static String REMOVING_TOKEN = "removing"; public final static String REMOVED_TOKEN = "removed"; + // values for ApplicationState.REMOVAL_COORDINATOR + public final static String REMOVAL_COORDINATOR = "REMOVER"; + public final int version; public final String value; @@ -129,20 +132,19 @@ public class VersionedValue implements C return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removingNonlocal(Token localToken, Token token) + public VersionedValue removingNonlocal(Token token) + { + return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + } + + public VersionedValue removedNonlocal(Token token) { - return new VersionedValue(VersionedValue.STATUS_NORMAL - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken) - + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removedNonlocal(Token localToken, Token token) + public VersionedValue removalCoordinator(Token token) { - return new VersionedValue(VersionedValue.STATUS_NORMAL - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken) - + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN - + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } public VersionedValue datacenter(String dcId) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Mon Jul 25 18:58:27 2011 @@ -456,6 +456,10 @@ public final class MessagingService impl public void receive(Message message, String id) { + if (logger_.isTraceEnabled()) + logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb() + + " from " + id + "@" + message.getFrom()); + message = SinkManager.processServerMessage(message, id); if (message == null) return; Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1150847&r1=1150846&r2=1150847&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Mon Jul 25 18:58:27 2011 @@ -623,29 +623,35 @@ public class StorageService implements I } /* - * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here. - * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode. - * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal. - * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode - * should instead be part of the token ring. + * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the + * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update + * from somewhere else. + * + * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were + * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing + * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or + * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have + * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring. * - * Normal MOVE_STATE progression of a node should be like this: - * STATE_BOOTSTRAPPING,token + * Normal progression of ApplicationState.STATUS values for a node should be like this: + * STATUS_BOOTSTRAPPING,token * if bootstrapping. stays this way until all files are received. - * STATE_NORMAL,token + * STATUS_NORMAL,token * ready to serve reads and writes. - * STATE_NORMAL,token,REMOVE_TOKEN,token - * specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose - * token is being removed. this value becomes the permanent state of this node (unless it coordinates another - * removetoken in the future). - * STATE_LEAVING,token - * get ready to leave the cluster as part of a decommission or move - * STATE_LEFT,token - * set after decommission or move is completed. - * STATE_MOVE,token - * set if node if currently moving to a new token in the ring - * - * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that + * STATUS_LEAVING,token + * get ready to leave the cluster as part of a decommission + * STATUS_LEFT,token + * set after decommission is completed. + * + * Other STATUS values that may be seen (possibly anywhere in the normal progression): + * STATUS_MOVING,newtoken + * set if node is currently moving to a new token in the ring + * REMOVING_TOKEN,deadtoken + * set if the node is dead and is being removed by its REMOVAL_COORDINATOR + * REMOVED_TOKEN,deadtoken + * set if the node is dead and has been removed by its REMOVAL_COORDINATOR + * + * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that * you should never bootstrap a new node during a removetoken, decommission or move. */ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) @@ -666,6 +672,8 @@ public class StorageService implements I handleStateBootstrap(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_NORMAL)) handleStateNormal(endpoint, pieces); + else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN)) + handleStateRemoving(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_LEAVING)) handleStateLeaving(endpoint, pieces); else if (moveName.equals(VersionedValue.STATUS_LEFT)) @@ -732,7 +740,7 @@ public class StorageService implements I * in reads. * * @param endpoint node - * @param pieces STATE_NORMAL,token[,other_state,token] + * @param pieces STATE_NORMAL,token */ private void handleStateNormal(InetAddress endpoint, String[] pieces) { @@ -774,12 +782,6 @@ public class StorageService implements I endpoint, currentOwner, token, endpoint)); } - if (pieces.length > 2) - { - assert pieces.length == 4; - handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]); - } - if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token tokenMetadata_.removeFromMoving(endpoint); @@ -861,37 +863,50 @@ public class StorageService implements I * Handle notification that a node being actively removed from the ring via 'removetoken' * * @param endpoint node - * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) + * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored) */ - private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state) + private void handleStateRemoving(InetAddress endpoint, String[] pieces) { - InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken); - - if (removeEndpoint == null) - return; - - if (removeEndpoint.equals(FBUtilities.getLocalAddress())) - { - logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?"); - return; - } + String state = pieces[0]; + assert (pieces.length > 0); - if (VersionedValue.REMOVED_TOKEN.equals(state)) + if (endpoint.equals(FBUtilities.getLocalAddress())) { - excise(removeToken, removeEndpoint); + logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?"); + try + { + drain(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + return; } - else if (VersionedValue.REMOVING_TOKEN.equals(state)) + if (tokenMetadata_.isMember(endpoint)) { - if (logger_.isDebugEnabled()) - logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")"); + Token removeToken = tokenMetadata_.getToken(endpoint); - // Note that the endpoint is being removed - tokenMetadata_.addLeavingEndpoint(removeEndpoint); - calculatePendingRanges(); + if (VersionedValue.REMOVED_TOKEN.equals(state)) + { + excise(removeToken, endpoint); + } + else if (VersionedValue.REMOVING_TOKEN.equals(state)) + { + if (logger_.isDebugEnabled()) + logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")"); - // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(removeEndpoint, endpoint); - } + // Note that the endpoint is being removed + tokenMetadata_.addLeavingEndpoint(endpoint); + calculatePendingRanges(); + + // find the endpoint coordinating this removal that we need to notify when we're done + String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); + Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]); + // grab any data we are now responsible for and notify responsible node + restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken)); + } + } // not a member, nothing to do } private void excise(Token token, InetAddress endpoint) @@ -1060,6 +1075,8 @@ public class StorageService implements I // notify the remote token Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote)); IFailureDetector failureDetector = FailureDetector.instance; + if (logger_.isDebugEnabled()) + logger_.debug("Notifying " + remote.toString() + " of replication completion\n"); while (failureDetector.isAlive(remote)) { IAsyncResult iar = MessagingService.instance().sendRR(msg, remote); @@ -2003,9 +2020,14 @@ public class StorageService implements I */ public void forceRemoveCompletion() { - if (!replicatingNodes.isEmpty()) + if (!replicatingNodes.isEmpty() || !tokenMetadata_.getLeavingEndpoints().isEmpty()) { logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ",")); + for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints()) + { + Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint)); + tokenMetadata_.removeEndpoint(endpoint); + } replicatingNodes.clear(); } else @@ -2069,9 +2091,9 @@ public class StorageService implements I tokenMetadata_.addLeavingEndpoint(endpoint); calculatePendingRanges(); - // bundle two states together. include this nodes state to keep the status quo, - // but indicate the leaving token so that it can be dealt with. - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token)); + // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us + // we add our own token so other nodes to let us know when they're done + Gossiper.instance.advertiseRemoving(endpoint, token, localToken); // kick off streaming commands restoreReplicaCount(endpoint, myAddress); @@ -2091,8 +2113,8 @@ public class StorageService implements I excise(token, endpoint); - // indicate the token has left - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token)); + // gossiper will indicate the token has left + Gossiper.instance.advertiseTokenRemoved(endpoint, token); replicatingNodes.clear(); removingNode = null; @@ -2100,8 +2122,18 @@ public class StorageService implements I public void confirmReplication(InetAddress node) { - assert !replicatingNodes.isEmpty(); - replicatingNodes.remove(node); + // replicatingNodes can be empty in the case where this node used to be a removal coordinator, + // but restarted before all 'replication finished' messages arrived. In that case, we'll + // still go ahead and acknowledge it. + if (!replicatingNodes.isEmpty()) + { + replicatingNodes.remove(node); + } + else + { + logger_.info("Received unexpected REPLICATION_FINISHED message from " + node + + ". Was this node recently a removal coordinator?"); + } } public boolean isClientMode()