Author: gdusbabek
Date: Mon Sep 13 16:11:01 2010
New Revision: 996590
URL: http://svn.apache.org/viewvc?rev=996590&view=rev
Log:
broadcast removetoken using MOVE,NORMAL to preserve the coordinator state.
patch by gdusbabek, reviewed by jbellis. CASSANDRA-1467
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=996590&r1=996589&r2=996590&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon Sep 13 16:11:01 2010
@@ -93,6 +93,7 @@ public class StorageService implements I
// this must be a char that cannot be present in any token
public final static char Delimiter = ',';
+ private final static String DelimiterStr = new String(new char[]
{Delimiter});
public final static String STATE_BOOTSTRAPPING = "BOOT";
public final static String STATE_NORMAL = "NORMAL";
@@ -100,7 +101,6 @@ public class StorageService implements I
public final static String STATE_LEFT = "LEFT";
public final static String REMOVE_TOKEN = "remove";
- public final static String LEFT_NORMALLY = "left";
/* All verb handler identifiers */
public enum Verb
@@ -588,6 +588,23 @@ public class StorageService implements I
* Nodes can start in either bootstrap or normal mode, and from bootstrap
mode can change mode to normal.
* A node in bootstrap mode needs to have pendingranges set in
TokenMetadata; a node in normal mode
* should instead be part of the token ring.
+ *
+ * Normal state progression of a node should be like this:
+ * STATE_BOOTSTRAPPING,token
+ * if bootstrapping. stays this way until all files are received.
+ * STATE_NORMAL,token
+ * ready to serve reads and writes.
+ * STATE_NORMAL,token,REMOVE_TOKEN,token
+ * specialized normal state in which this node acts as a proxy to tell
the cluster about a dead node whose
+ * token is being removed. this value becomes the permanent state of
this node (unless it coordinates another
+ * removetoken in the future).
+ * STATE_LEAVING,token
+ * get ready to leave the cluster as part of a decommission or move
+ * STATE_LEFT,token
+ * set after decommission or move is completed.
+ *
+ * Note: Any time a node state changes from STATE_NORMAL, it will not be
visible to new nodes. So it follows that
+ * you should never bootstrap a new node during a removetoken,
decommission or move.
*/
public void onChange(InetAddress endpoint, String apStateName,
ApplicationState apState)
{
@@ -595,31 +612,31 @@ public class StorageService implements I
return;
String apStateValue = apState.getValue();
- int index = apStateValue.indexOf(Delimiter);
- assert (index != -1);
+ String[] pieces = apStateValue.split(DelimiterStr, -1);
+ assert (pieces.length > 0);
- String moveName = apStateValue.substring(0, index);
- String moveValue = apStateValue.substring(index+1);
+ String moveName = pieces[0];
if (moveName.equals(STATE_BOOTSTRAPPING))
- handleStateBootstrap(endpoint, moveValue);
+ handleStateBootstrap(endpoint, pieces);
else if (moveName.equals(STATE_NORMAL))
- handleStateNormal(endpoint, moveValue);
+ handleStateNormal(endpoint, pieces);
else if (moveName.equals(STATE_LEAVING))
- handleStateLeaving(endpoint, moveValue);
+ handleStateLeaving(endpoint, pieces);
else if (moveName.equals(STATE_LEFT))
- handleStateLeft(endpoint, moveValue);
+ handleStateLeft(endpoint, pieces);
}
/**
* Handle node bootstrap
*
* @param endpoint bootstrapping node
- * @param moveValue bootstrap token as string
+ * @param pieces STATE_BOOTSTRAPPING,bootstrap token as string
*/
- private void handleStateBootstrap(InetAddress endpoint, String moveValue)
+ private void handleStateBootstrap(InetAddress endpoint, String[] pieces)
{
- Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+ assert pieces.length == 2;
+ Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
if (logger_.isDebugEnabled())
logger_.debug("Node " + endpoint + " state bootstrapping, token "
+ token);
@@ -648,11 +665,12 @@ public class StorageService implements I
* in reads.
*
* @param endpoint node
- * @param moveValue token as string
+ * @param pieces STATE_NORMAL,token[,other_state,token]
*/
- private void handleStateNormal(InetAddress endpoint, String moveValue)
+ private void handleStateNormal(InetAddress endpoint, String[] pieces)
{
- Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+ assert pieces.length >= 2;
+ Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
if (logger_.isDebugEnabled())
logger_.debug("Node " + endpoint + " state normal, token " +
token);
@@ -666,6 +684,31 @@ public class StorageService implements I
tokenMetadata_.updateNormalToken(token, endpoint);
else
logger_.info("Will not change my token ownership to " + endpoint);
+
+ if (pieces.length > 2)
+ {
+ if (REMOVE_TOKEN.equals(pieces[2]))
+ {
+ // remove token was called on a dead node.
+ Token tokenThatLeft =
getPartitioner().getTokenFactory().fromString(pieces[3]);
+ InetAddress endpointThatLeft =
tokenMetadata_.getEndpoint(tokenThatLeft);
+ // let's make sure that we're not removing ourselves. This can
happen when a node
+ // enters ring as a replacement for a removed node.
removeToken for the old node is
+ // still in gossip, so we will see it.
+ if (FBUtilities.getLocalAddress().equals(endpointThatLeft))
+ {
+ logger_.info("Received removeToken gossip about myself. Is
this node a replacement for a removed one?");
+ return;
+ }
+ logger_.debug("Token " + tokenThatLeft + " removed manually
(endpoint was " + ((endpointThatLeft == null) ? "unknown" : endpointThatLeft) +
")");
+ if (endpointThatLeft != null)
+ {
+ removeEndpointLocally(endpointThatLeft);
+ }
+ tokenMetadata_.removeBootstrapToken(tokenThatLeft);
+ }
+ }
+
calculatePendingRanges();
if (!isClientMode)
SystemTable.updateToken(endpoint, token);
@@ -675,10 +718,12 @@ public class StorageService implements I
* Handle node preparing to leave the ring
*
* @param endpoint node
- * @param moveValue token as string
+ * @param pieces STATE_LEAVING,token
*/
- private void handleStateLeaving(InetAddress endpoint, String moveValue)
+ private void handleStateLeaving(InetAddress endpoint, String[] pieces)
{
+ assert pieces.length == 2;
+ String moveValue = pieces[1];
Token token = getPartitioner().getTokenFactory().fromString(moveValue);
if (logger_.isDebugEnabled())
@@ -705,56 +750,29 @@ public class StorageService implements I
}
/**
- * Handle node leaving the ring. This can be either because the node was
removed manually by
- * removetoken command or because of decommission or loadbalance
+ * Handle node leaving the ring. This can be either because of
decommission or loadbalance
*
- * @param endpoint If reason for leaving is decommission or loadbalance
(LEFT_NORMALLY),
- * endpoint is the leaving node. If reason manual removetoken
(REMOVE_TOKEN), endpoint
- * parameter is ignored and the operation is based on the token inside
moveValue.
- * @param moveValue (REMOVE_TOKEN|LEFT_NORMALLY)<Delimiter><token>
- */
- private void handleStateLeft(InetAddress endpoint, String moveValue)
- {
- int index = moveValue.indexOf(Delimiter);
- assert (index != -1);
- String typeOfState = moveValue.substring(0, index);
- Token token =
getPartitioner().getTokenFactory().fromString(moveValue.substring(index + 1));
+ * @param endpoint If reason for leaving is decommission or loadbalance
+ * endpoint is the leaving node.
+ * @param pieces STATE_LEFT,token
+ */
+ private void handleStateLeft(InetAddress endpoint, String[] pieces)
+ {
+ assert pieces.length == 2;
+ Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
// endpoint itself is leaving
- if (typeOfState.equals(LEFT_NORMALLY))
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Node " + endpoint + " state left, token " +
token);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endpoint + " state left, token " + token);
- // If the node is member, remove all references to it. If not, call
- // removeBootstrapToken just in case it is there (very unlikely
chain of events)
- if (tokenMetadata_.isMember(endpoint))
- {
- if (!tokenMetadata_.getToken(endpoint).equals(token))
- logger_.warn("Node " + endpoint + " 'left' token mismatch.
Long network partition?");
- tokenMetadata_.removeEndpoint(endpoint);
- HintedHandOffManager.deleteHintsForEndPoint(endpoint);
- }
- }
- else
+ // If the node is member, remove all references to it. If not, call
+ // removeBootstrapToken just in case it is there (very unlikely chain
of events)
+ if (tokenMetadata_.isMember(endpoint))
{
- // if we're here, endpoint is not leaving but broadcasting remove
token command
- assert (typeOfState.equals(REMOVE_TOKEN));
- InetAddress endpointThatLeft = tokenMetadata_.getEndpoint(token);
- // let's make sure that we're not removing ourselves. This can
happen when a node
- // enters ring as a replacement for a removed node. removeToken
for the old node is
- // still in gossip, so we will see it.
- if (FBUtilities.getLocalAddress().equals(endpointThatLeft))
- {
- logger_.info("Received removeToken gossip about myself. Is
this node a replacement for a removed one?");
- return;
- }
- if (logger_.isDebugEnabled())
- logger_.debug("Token " + token + " removed manually (endpoint
was " + ((endpointThatLeft == null) ? "unknown" : endpointThatLeft) + ")");
- if (endpointThatLeft != null)
- {
- removeEndpointLocally(endpointThatLeft);
- }
+ if (!tokenMetadata_.getToken(endpoint).equals(token))
+ logger_.warn("Node " + endpoint + " 'left' token mismatch.
Long network partition?");
+ tokenMetadata_.removeEndpoint(endpoint);
+ HintedHandOffManager.deleteHintsForEndPoint(endpoint);
}
// remove token from bootstrap tokens just in case it is still there
@@ -1470,7 +1488,7 @@ public class StorageService implements I
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
calculatePendingRanges();
- Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter +
partitioner_.getTokenFactory().toString(getLocalToken())));
+ Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter +
partitioner_.getTokenFactory().toString(getLocalToken())));
try
{
Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1607,14 +1625,8 @@ public class StorageService implements I
calculatePendingRanges();
}
- // This is not the cleanest way as we're adding STATE_LEFT for
- // a foreign token to our own EP state. Another way would be
- // to add new AP state for this command, but that would again
- // increase the amount of data to be gossiped in the cluster -
- // not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
- // between ``removetoken command and normal state left, so it is
- // not so bad.
- Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter +
partitioner_.getTokenFactory().toString(token)));
+ // bundle two states together. include this nodes state to keep the
status quo, but indicate the leaving token so that it can be dealt with.
+ Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(getLocalToken()) + Delimiter +
REMOVE_TOKEN + Delimiter + partitioner_.getTokenFactory().toString(token)));
}
public boolean isClientMode()
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=996590&r1=996589&r2=996590&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Mon
Sep 13 16:11:01 2010
@@ -294,8 +294,8 @@ public class MoveTest extends CleanupHel
// Now finish node 6 and node 9 leaving, as well as boot1 (after this
node 8 is still
// leaving and boot2 in progress
- ss.onChange(hosts.get(LEAVING[0]), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[0]))));
- ss.onChange(hosts.get(LEAVING[2]), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[2]))));
+ ss.onChange(hosts.get(LEAVING[0]), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[0]))));
+ ss.onChange(hosts.get(LEAVING[2]), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(LEAVING[2]))));
ss.onChange(boot1, StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(5))));
// adjust precalcuated results. this changes what the epected
endpoints are.
@@ -499,7 +499,7 @@ public class MoveTest extends CleanupHel
// node 3 goes through leave and left and then jumps to normal
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEAVING + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(2))));
- ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(2))));
+ ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(2))));
ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_NORMAL + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(4))));
assertTrue(tmd.getBootstrapTokens().isEmpty());
@@ -548,7 +548,7 @@ public class MoveTest extends CleanupHel
assertTrue(tmd.getBootstrapTokens().isEmpty());
// go to state left
- ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(1))));
+ ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(1))));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -574,7 +574,7 @@ public class MoveTest extends CleanupHel
createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
7);
// node hosts.get(2) goes jumps to left
- ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(2))));
+ ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(endpointTokens.get(2))));
assertFalse(tmd.isMember(hosts.get(2)));
@@ -586,7 +586,7 @@ public class MoveTest extends CleanupHel
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// and then directly to 'left'
- ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
StorageService.LEFT_NORMALLY + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(1))));
+ ss.onChange(hosts.get(2), StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_LEFT + StorageService.Delimiter +
partitioner.getTokenFactory().toString(keyTokens.get(1))));
assertTrue(tmd.getBootstrapTokens().size() == 0);
assertFalse(tmd.isMember(hosts.get(2)));