Updated Branches: refs/heads/trunk bfe2dfb50 -> dfd05673c
removeNode (by ID) instead of token Patch by eevans; reviewed by Brandon Williams for CASSANDRA-4120 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfd05673 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfd05673 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfd05673 Branch: refs/heads/trunk Commit: dfd05673cd59dcd18ffea6c7f26dec4f880d5085 Parents: 0a5267e Author: Eric Evans <[email protected]> Authored: Wed May 2 18:48:54 2012 -0500 Committer: Eric Evans <[email protected]> Committed: Wed May 2 18:48:54 2012 -0500 ---------------------------------------------------------------------- NEWS.txt | 6 +++ src/java/org/apache/cassandra/gms/Gossiper.java | 18 +++++----- .../org/apache/cassandra/gms/VersionedValue.java | 22 +++++++---- .../apache/cassandra/locator/TokenMetadata.java | 6 +++ .../apache/cassandra/service/StorageService.java | 29 ++++++++------- .../cassandra/service/StorageServiceMBean.java | 2 +- src/java/org/apache/cassandra/tools/NodeCmd.java | 8 +++-- src/java/org/apache/cassandra/tools/NodeProbe.java | 4 +- .../org/apache/cassandra/service/RemoveTest.java | 25 +++++-------- 9 files changed, 69 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 50a0d3c..7a8b9be 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -18,6 +18,12 @@ Upgrading snapshots and then truncates the hints column family as part of starting up 1.2 for the first time. Additionally, upgraded nodes will not store new hints destined for older (pre-1.2) nodes. + - The `nodetool removetoken` command (and corresponding JMX operation) + have been renamed to `nodetool removenode`. This function is + incompatible with the earlier `nodetool removetoken`, and attempts to + remove nodes in this way with a mixed 1.1 (or lower) / 1.2 cluster, + is not supported. + 1.1.1 ===== http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 2a948a6..2feecfa 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -382,15 +382,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * This should never be called unless this coordinator has had 'removetoken' invoked * * @param endpoint - the endpoint being removed - * @param token - the token being removed - * @param mytoken - my own token for replication coordination + * @param hostId - the ID of the host being removed + * @param localHostId - my own host ID for replication coordination */ - public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken) + public void advertiseRemoving(InetAddress endpoint, UUID hostId, UUID localHostId) { EndpointState epState = endpointStateMap.get(endpoint); // remember this node's generation int generation = epState.getHeartBeatState().getGeneration(); - logger.info("Removing token: " + token); + logger.info("Removing host: {}", hostId); logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change"); try { @@ -408,8 +408,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean logger.info("Advertising removal for " + endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token)); - epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken)); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(hostId)); + epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(localHostId)); endpointStateMap.put(endpoint, epState); } @@ -417,14 +417,14 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN * This should only be called after advertiseRemoving * @param endpoint - * @param token + * @param hostId */ - public void advertiseTokenRemoved(InetAddress endpoint, Token token) + public void advertiseTokenRemoved(InetAddress endpoint, UUID hostId) { EndpointState epState = endpointStateMap.get(endpoint); epState.updateTimestamp(); // make sure we don't evict it too soon epState.getHeartBeatState().forceNewerGenerationUnsafe(); - epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token,computeExpireTime())); + epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(hostId,computeExpireTime())); logger.info("Completing removal of " + endpoint); endpointStateMap.put(endpoint, epState); // ensure at least one gossip round occurs before returning http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 25225c5..10dbb6a 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -144,20 +144,19 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); } - public VersionedValue removingNonlocal(Token token) + public VersionedValue removingNonlocal(UUID hostId) { - return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(versionString(VersionedValue.REMOVING_TOKEN, hostId.toString())); } - public VersionedValue removedNonlocal(Token token, long expireTime) + public VersionedValue removedNonlocal(UUID hostId, long expireTime) { - return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER - + partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + expireTime); + return new VersionedValue(versionString(VersionedValue.REMOVED_TOKEN, hostId.toString(), Long.toString(expireTime))); } - public VersionedValue removalCoordinator(Token token) + public VersionedValue removalCoordinator(UUID hostId) { - return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token)); + return new VersionedValue(versionString(VersionedValue.REMOVAL_COORDINATOR, hostId.toString())); } public VersionedValue hibernate(boolean value) @@ -205,11 +204,18 @@ public class VersionedValue implements Comparable<VersionedValue> if (version < MessagingService.VERSION_12) { String[] pieces = value.value.split(DELIMITER_STR, -1); - if ((pieces[0] == STATUS_NORMAL) || pieces[0] == STATUS_BOOTSTRAPPING) + String type = pieces[0]; + + if ((type == STATUS_NORMAL) || type == STATUS_BOOTSTRAPPING) { assert pieces.length >= 3; outValue = versionString(pieces[0], pieces[2]); } + + if ((type == REMOVAL_COORDINATOR) || (type == REMOVING_TOKEN) || (type == REMOVED_TOKEN)) + throw new RuntimeException(String.format("Unable to serialize %s(%s...) for nodes older than 1.2", + VersionedValue.class.getName(), + type)); } dos.writeUTF(outValue); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/locator/TokenMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 1cb2a61..b78a748 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -214,6 +214,12 @@ public class TokenMetadata return endpointToHostIdMap.get(endpoint); } + /** Return the end-point for a unique host ID */ + public InetAddress getEndpointForHostId(UUID hostId) + { + return endpointToHostIdMap.inverse().get(hostId); + } + /** @return a copy of the endpoint-to-id map for read-only operations */ public Map<InetAddress, UUID> getEndpointToHostIdMapForReading() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 64ec4b6..7d651d4 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1314,9 +1314,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe // find the endpoint coordinating this removal that we need to notify when we're done String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1); - Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]); + UUID hostId = UUID.fromString(coordinator[1]); // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(endpoint, tokenMetadata.getEndpoint(coordtoken)); + restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId)); } } // not a member, nothing to do } @@ -2558,8 +2558,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe logger.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ",")); for (InetAddress endpoint : tokenMetadata.getLeavingEndpoints()) { + UUID hostId = tokenMetadata.getHostId(endpoint); + Gossiper.instance.advertiseTokenRemoved(endpoint, hostId); Token token = tokenMetadata.getToken(endpoint); - Gossiper.instance.advertiseTokenRemoved(endpoint, token); excise(token, endpoint); } replicatingNodes.clear(); @@ -2578,23 +2579,25 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe * restore the replica count, finally forceRemoveCompleteion should be * called to forcibly remove the node without regard to replica count. * - * @param tokenString token for the node + * @param hostIdString token for the node */ - public void removeToken(String tokenString) + public void removeNode(String hostIdString) { InetAddress myAddress = FBUtilities.getBroadcastAddress(); - Token localToken = tokenMetadata.getToken(myAddress); - Token token = getPartitioner().getTokenFactory().fromString(tokenString); - InetAddress endpoint = tokenMetadata.getEndpoint(token); + UUID localHostId = tokenMetadata.getHostId(myAddress); + UUID hostId = UUID.fromString(hostIdString); + InetAddress endpoint = tokenMetadata.getEndpointForHostId(hostId); if (endpoint == null) - throw new UnsupportedOperationException("Token not found."); + throw new UnsupportedOperationException("Host ID not found."); + + Token token = tokenMetadata.getToken(endpoint); if (endpoint.equals(myAddress)) - throw new UnsupportedOperationException("Cannot remove node's own token"); + throw new UnsupportedOperationException("Cannot remove self"); if (Gossiper.instance.getLiveMembers().contains(endpoint)) - throw new UnsupportedOperationException("Node " + endpoint + " is alive and owns this token. Use decommission command to remove it from the ring"); + throw new UnsupportedOperationException("Node " + endpoint + " is alive and owns this ID. Use decommission command to remove it from the ring"); // A leaving endpoint that is dead is already being removed. if (tokenMetadata.isLeaving(endpoint)) @@ -2628,7 +2631,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe calculatePendingRanges(); // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us // we add our own token so other nodes to let us know when they're done - Gossiper.instance.advertiseRemoving(endpoint, token, localToken); + Gossiper.instance.advertiseRemoving(endpoint, hostId, localHostId); // kick off streaming commands restoreReplicaCount(endpoint, myAddress); @@ -2649,7 +2652,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe excise(token, endpoint); // gossiper will indicate the token has left - Gossiper.instance.advertiseTokenRemoved(endpoint, token); + Gossiper.instance.advertiseTokenRemoved(endpoint, hostId); replicatingNodes.clear(); removingNode = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2af4215..e9a84a7 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -286,7 +286,7 @@ public interface StorageServiceMBean * removeToken removes token (and all data associated with * enpoint that had it) from the ring */ - public void removeToken(String token); + public void removeNode(String token); /** * Get the status of a token removal. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index 4f2fd8b..2a2bfc1 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -105,6 +105,7 @@ public class NodeCmd REBUILD, REFRESH, REMOVETOKEN, + REMOVENODE, REPAIR, RING, SCRUB, @@ -158,7 +159,7 @@ public class NodeCmd // One arg addCmdHelp(header, "netstats [host]", "Print network information on provided host (connecting node by default)"); addCmdHelp(header, "move <new token>", "Move node on the token ring to a new token"); - addCmdHelp(header, "removetoken status|force|<token>", "Show status of current token removal, force completion of pending removal or remove providen token"); + addCmdHelp(header, "removenode status|force|<ID>", "Show status of current node removal, force completion of pending removal or remove provided ID"); addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling."); addCmdHelp(header, "setstreamthroughput <value_in_mb>", "Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling."); addCmdHelp(header, "describering [keyspace]", "Shows the token ranges info of a given keyspace."); @@ -787,11 +788,12 @@ public class NodeCmd probe.rebuild(arguments.length == 1 ? arguments[0] : null); break; + case REMOVENODE : case REMOVETOKEN : - if (arguments.length != 1) { badUse("Missing an argument for removetoken (either status, force, or a token)"); } + if (arguments.length != 1) { badUse("Missing an argument for removenode (either status, force, or an ID)"); } else if (arguments[0].equals("status")) { nodeCmd.printRemovalStatus(System.out); } else if (arguments[0].equals("force")) { nodeCmd.printRemovalStatus(System.out); probe.forceRemoveCompletion(); } - else { probe.removeToken(arguments[0]); } + else { probe.removeNode(arguments[0]); } break; case INVALIDATEKEYCACHE : http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index e342cf2..a325f91 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -397,9 +397,9 @@ public class NodeProbe ssProxy.move(newToken); } - public void removeToken(String token) + public void removeNode(String token) { - ssProxy.removeToken(token); + ssProxy.removeNode(token); } public String getRemovalStatus() http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd05673/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index e56e291..e8ccfba 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -60,7 +60,7 @@ public class RemoveTest List<InetAddress> hosts = new ArrayList<InetAddress>(); List<UUID> hostIds = new ArrayList<UUID>(); InetAddress removalhost; - Token removaltoken; + UUID removalId; @BeforeClass public static void setupClass() throws IOException @@ -92,8 +92,8 @@ public class RemoveTest } removalhost = hosts.get(5); hosts.remove(removalhost); - removaltoken = endpointTokens.get(5); - endpointTokens.remove(removaltoken); + removalId = hostIds.get(5); + hostIds.remove(removalId); } @After @@ -105,27 +105,22 @@ public class RemoveTest } @Test(expected = UnsupportedOperationException.class) - public void testBadToken() + public void testBadHostId() { - final String token = StorageService.getPartitioner().getTokenFactory().toString(keyTokens.get(2)); - ss.removeToken(token); + ss.removeNode("ffffffff-aaaa-aaaa-aaaa-ffffffffffff"); } @Test(expected = UnsupportedOperationException.class) - public void testLocalToken() + public void testLocalHostId() { - //first token should be localhost - final String token = StorageService.getPartitioner().getTokenFactory().toString(endpointTokens.get(0)); - ss.removeToken(token); + //first ID should be localhost + ss.removeNode(hostIds.get(0).toString()); } @Test - public void testRemoveToken() throws InterruptedException + public void testRemoveHostId() throws InterruptedException { - IPartitioner partitioner = StorageService.getPartitioner(); - - final String token = partitioner.getTokenFactory().toString(removaltoken); ReplicationSink rSink = new ReplicationSink(); SinkManager.add(rSink); @@ -137,7 +132,7 @@ public class RemoveTest { try { - ss.removeToken(token); + ss.removeNode(removalId.toString()); } catch (Exception e) {
