Author: jbellis
Date: Mon Dec 21 17:14:00 2009
New Revision: 892885
URL: http://svn.apache.org/viewvc?rev=892885&view=rev
Log:
Use one state for all movement related gossip and fix some bugs related to
pending ranges on the leaving node itself. patch by Jaakko Laine; reviewed by
jbellis for CASSANDRA-572
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=892885&r1=892884&r2=892885&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/locator/TokenMetadata.java
Mon Dec 21 17:14:00 2009
@@ -47,7 +47,7 @@
// for any nodes that boot simultaneously between same two nodes. For this
we cannot simply make pending ranges a multimap,
// since that would make us unable to notice the real problem of two nodes
trying to boot using the same token.
// In order to do this properly, we need to know what tokens are booting
at any time.
- private Map<Token, InetAddress> bootstrapTokens;
+ private BiMap<Token, InetAddress> bootstrapTokens;
// we will need to know at all times what nodes are leaving and calculate
ranges accordingly.
// An anonymous pending ranges list is not enough, as that does not tell
which node is leaving
@@ -71,7 +71,7 @@
if (tokenToEndPointMap == null)
tokenToEndPointMap = HashBiMap.create();
this.tokenToEndPointMap = tokenToEndPointMap;
- bootstrapTokens = new HashMap<Token, InetAddress>();
+ bootstrapTokens = HashBiMap.create();
leavingEndPoints = new HashSet<InetAddress>();
pendingRanges = HashMultimap.create();
sortedTokens = sortTokens();
@@ -103,13 +103,13 @@
lock.writeLock().lock();
try
{
- bootstrapTokens.remove(token);
-
+ bootstrapTokens.inverse().remove(endpoint);
tokenToEndPointMap.inverse().remove(endpoint);
if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
{
sortedTokens = sortTokens();
}
+ leavingEndPoints.remove(endpoint);
}
finally
{
@@ -125,9 +125,17 @@
lock.writeLock().lock();
try
{
- InetAddress oldEndPoint = bootstrapTokens.get(token);
+ InetAddress oldEndPoint = null;
+
+ oldEndPoint = bootstrapTokens.get(token);
+ if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between
" + oldEndPoint + " and " + endpoint + " (token " + token);
+
+ oldEndPoint = tokenToEndPointMap.get(token);
if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
throw new RuntimeException("Bootstrap Token collision between
" + oldEndPoint + " and " + endpoint + " (token " + token);
+
+ bootstrapTokens.inverse().remove(endpoint);
bootstrapTokens.put(token, endpoint);
}
finally
@@ -136,6 +144,21 @@
}
}
+ public void removeBootstrapToken(Token token)
+ {
+ assert token != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ bootstrapTokens.remove(token);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void addLeavingEndPoint(InetAddress endpoint)
{
assert endpoint != null;
@@ -151,13 +174,28 @@
}
}
+ public void removeLeavingEndPoint(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ leavingEndPoints.remove(endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void removeEndpoint(InetAddress endpoint)
{
assert tokenToEndPointMap.containsValue(endpoint);
lock.writeLock().lock();
try
{
- bootstrapTokens.remove(getToken(endpoint));
+ bootstrapTokens.inverse().remove(endpoint);
tokenToEndPointMap.inverse().remove(endpoint);
leavingEndPoints.remove(endpoint);
sortedTokens = sortTokens();
@@ -199,6 +237,21 @@
}
}
+ public boolean isLeaving(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.readLock().lock();
+ try
+ {
+ return leavingEndPoints.contains(endpoint);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
public InetAddress getFirstEndpoint()
{
assert tokenToEndPointMap.size() > 0;
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java?rev=892885&r1=892884&r2=892885&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/service/StorageService.java
Mon Dec 21 17:14:00 2009
@@ -62,15 +62,18 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
- // these aren't in an enum since other gossip users can create states
ad-hoc too (e.g. load broadcasting)
+ public final static String MOVE_STATE = "MOVE";
+
+ // this must be a char that cannot be present in any token
+ public final static char Delimiter = ',';
+
+ public final static String STATE_BOOTSTRAPPING = "BOOT";
public final static String STATE_NORMAL = "NORMAL";
- public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
public final static String STATE_LEAVING = "LEAVING";
public final static String STATE_LEFT = "LEFT";
- private final static char StateDelimiter = ',';
- private final static String REMOVE_TOKEN = "remove";
- private final static String LEFT_NORMALLY = "left";
+ public final static String REMOVE_TOKEN = "remove";
+ public final static String LEFT_NORMALLY = "left";
/* All verb handler identifiers */
public final static String mutationVerbHandler_ =
"ROW-MUTATION-VERB-HANDLER";
@@ -183,7 +186,7 @@
isBootstrapMode = false;
SystemTable.setBootstrapped(true);
setToken(getLocalToken());
- Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL,
new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(getLocalToken())));
logger_.info("Bootstrap/move completed! Now serving reads.");
}
@@ -317,7 +320,7 @@
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
tokenMetadata_.updateNormalToken(token,
FBUtilities.getLocalAddress());
-
Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new
ApplicationState(partitioner_.getTokenFactory().toString(token)));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(token)));
}
assert tokenMetadata_.sortedTokens().size() > 0;
@@ -327,7 +330,7 @@
{
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us
part of the ring locally which is incorrect until we are done bootstrapping
-
Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new
ApplicationState(partitioner_.getTokenFactory().toString(token)));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_BOOTSTRAPPING + Delimiter +
partitioner_.getTokenFactory().toString(token)));
logger_.info("bootstrap sleeping " + Streaming.RING_DELAY);
try
{
@@ -410,67 +413,164 @@
* A node in bootstrap mode needs to have pendingranges set in
TokenMetadata; a node in normal mode
* should instead be part of the token ring.
*/
- public void onChange(InetAddress endpoint, String stateName,
ApplicationState state)
+ public void onChange(InetAddress endpoint, String apStateName,
ApplicationState apState)
{
- if (STATE_BOOTSTRAPPING.equals(stateName))
- {
- Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state bootstrapping, token " +
token);
- tokenMetadata_.addBootstrapToken(token, endpoint);
- calculatePendingRanges();
+ if (!MOVE_STATE.equals(apStateName))
+ return;
+
+ String apStateValue = apState.getValue();
+ int index = apStateValue.indexOf(Delimiter);
+ assert (index != -1);
+
+ String moveName = apStateValue.substring(0, index);
+ String moveValue = apStateValue.substring(index+1);
+
+ if (moveName.equals(STATE_BOOTSTRAPPING))
+ handleStateBootstrap(endpoint, moveValue);
+ else if (moveName.equals(STATE_NORMAL))
+ handleStateNormal(endpoint, moveValue);
+ else if (moveName.equals(STATE_LEAVING))
+ handleStateLeaving(endpoint, moveValue);
+ else if (moveName.equals(STATE_LEFT))
+ handleStateLeft(endpoint, moveValue);
+ }
+
+ /**
+ * Handle node bootstrap
+ *
+ * @param endPoint bootstrapping node
+ * @param moveValue bootstrap token as string
+ */
+ private void handleStateBootstrap(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state bootstrapping, token "
+ token);
+
+ // if this node is present in token metadata, either we have missed
intermediate states
+ // or the node had crashed. Print warning if needed, clear obsolete
stuff and
+ // continue.
+ if (tokenMetadata_.isMember(endPoint))
+ {
+ // If isLeaving is false, we have missed both LEAVING and LEFT.
However, if
+ // isLeaving is true, we have only missed LEFT. Waiting time
between completing
+ // leave operation and rebootstrapping is relatively short, so the
latter is quite
+ // common (not enough time for gossip to spread). Therefore we
report only the
+ // former in the log.
+ if (!tokenMetadata_.isLeaving(endPoint))
+ logger_.info("Node " + endPoint + " state jump to bootstrap");
+ tokenMetadata_.removeEndpoint(endPoint);
}
- else if (STATE_NORMAL.equals(stateName))
+
+ tokenMetadata_.addBootstrapToken(token, endPoint);
+ calculatePendingRanges();
+ }
+
+ /**
+ * Handle node move to normal state. That is, node is entering token ring
and participating
+ * in reads.
+ *
+ * @param endPoint node
+ * @param moveValue token as string
+ */
+ private void handleStateNormal(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state normal, token " +
token);
+
+ if (tokenMetadata_.isMember(endPoint))
+ logger_.info("Node " + endPoint + " state jump to normal");
+
+ tokenMetadata_.updateNormalToken(token, endPoint);
+ calculatePendingRanges();
+ if (!isClientMode)
+ SystemTable.updateToken(endPoint, token);
+ }
+
+ /**
+ * Handle node preparing to leave the ring
+ *
+ * @param endPoint node
+ * @param moveValue token as string
+ */
+ private void handleStateLeaving(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state leaving, token " +
token);
+
+ // If the node is previously unknown or tokens do not match, update
tokenmetadata to
+ // have this node as 'normal' (it must have been using this token
before the
+ // leave). This way we'll get pending ranges right.
+ if (!tokenMetadata_.isMember(endPoint))
{
- Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state normal, token " + token);
- tokenMetadata_.updateNormalToken(token, endpoint);
- calculatePendingRanges();
- if (!isClientMode)
- SystemTable.updateToken(endpoint, token);
+ logger_.info("Node " + endPoint + " state jump to leaving");
+ tokenMetadata_.updateNormalToken(token, endPoint);
}
- else if (STATE_LEAVING.equals(stateName))
+ else if (!tokenMetadata_.getToken(endPoint).equals(token))
{
- Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
- assert tokenMetadata_.getToken(endpoint).equals(token);
- tokenMetadata_.addLeavingEndPoint(endpoint);
- calculatePendingRanges();
+ logger_.warn("Node " + endPoint + " 'leaving' token mismatch. Long
network partition?");
+ tokenMetadata_.updateNormalToken(token, endPoint);
}
- else if (STATE_LEFT.equals(stateName))
+
+ // at this point the endpoint is certainly a member with this token,
so let's proceed
+ // normally
+ tokenMetadata_.addLeavingEndPoint(endPoint);
+ calculatePendingRanges();
+ }
+
+ /**
+ * Handle node leaving the ring. This can be either because the node was
removed manually by
+ * removetoken command or because of decommission or loadbalance
+ *
+ * @param endPoint If reason for leaving is decommission or loadbalance
(LEFT_NORMALLY),
+ * endPoint is the leaving node. If reason manual removetoken
(REMOVE_TOKEN), endPoint
+ * parameter is ignored and the operation is based on the token inside
moveValue.
+ * @param moveValue (REMOVE_TOKEN|LEFT_NORMALLY)<Delimiter><token>
+ */
+ private void handleStateLeft(InetAddress endPoint, String moveValue)
+ {
+ int index = moveValue.indexOf(Delimiter);
+ assert (index != -1);
+ String typeOfState = moveValue.substring(0, index);
+ Token token =
getPartitioner().getTokenFactory().fromString(moveValue.substring(index + 1));
+
+ // endPoint itself is leaving
+ if (typeOfState.equals(LEFT_NORMALLY))
{
- // STATE_LEFT state is of form
(REMOVE_TOKEN|LEFT_NORMALLY)<StateDelimiter><token>
- String stateValue = state.getValue();
- int index = stateValue.indexOf(StateDelimiter);
- assert (index != -1);
- String typeOfState = stateValue.substring(0, index);
- Token token =
getPartitioner().getTokenFactory().fromString(stateValue.substring(index + 1));
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state left, token " +
token);
- if (typeOfState.equals(LEFT_NORMALLY))
+ // If the node is member, remove all references to it. If not, call
+ // removeBootstrapToken just in case it is there (very unlikely
chain of events)
+ if (tokenMetadata_.isMember(endPoint))
{
- if (tokenMetadata_.isMember(endpoint))
- {
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state left, token " +
token);
- assert tokenMetadata_.getToken(endpoint).equals(token);
- tokenMetadata_.removeEndpoint(endpoint);
- calculatePendingRanges();
- }
+ if (!tokenMetadata_.getToken(endPoint).equals(token))
+ logger_.warn("Node " + endPoint + " 'left' token mismatch.
Long network partition?");
+ tokenMetadata_.removeEndpoint(endPoint);
}
- else
+ }
+ else
+ {
+ // if we're here, endPoint is not leaving but broadcasting remove
token command
+ assert (typeOfState.equals(REMOVE_TOKEN));
+ InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + token + " removed manually (endpoint
was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
+ if (endPointThatLeft != null)
{
- assert (typeOfState.equals(REMOVE_TOKEN));
- InetAddress endPointThatLeft =
tokenMetadata_.getEndPoint(token);
- if (logger_.isDebugEnabled())
- logger_.debug("Token " + token + " removed manually
(endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) +
")");
- if (endPointThatLeft != null)
- {
- restoreReplicaCount(endPointThatLeft);
- tokenMetadata_.removeEndpoint(endPointThatLeft);
- calculatePendingRanges();
- }
+ restoreReplicaCount(endPointThatLeft);
+ tokenMetadata_.removeEndpoint(endPointThatLeft);
}
}
+
+ // remove token from bootstrap tokens just in case it is still there
+ tokenMetadata_.removeBootstrapToken(token);
+ calculatePendingRanges();
}
/**
@@ -554,7 +654,7 @@
tm.setPendingRanges(pendingRanges);
if (logger_.isDebugEnabled())
- logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
+ logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ?
"<empty>" : tm.printPendingRanges()));
}
/**
@@ -639,7 +739,11 @@
currentReplicaEndpoints.put(range,
replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
- temp.removeEndpoint(endpoint);
+
+ // endpoint might or might not be 'leaving'. If it was not leaving
(that is, removetoken
+ // command was used), it is still present in temp and must be removed.
+ if (temp.isMember(endpoint))
+ temp.removeEndpoint(endpoint);
Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
@@ -653,7 +757,10 @@
ArrayList<InetAddress> newReplicaEndpoints =
replicationStrategy_.getNaturalEndpoints(range.right(), temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
- logger_.debug("Range " + range + " will be responsibility of "
+ StringUtils.join(newReplicaEndpoints, ", "));
+ if (newReplicaEndpoints.isEmpty())
+ logger_.debug("Range " + range + " already in all
replicas");
+ else
+ logger_.debug("Range " + range + " will be responsibility
of " + StringUtils.join(newReplicaEndpoints, ", "));
changedRanges.putAll(range, newReplicaEndpoints);
}
@@ -1152,17 +1259,27 @@
return tokens;
}
+ /**
+ * Broadcast leaving status and update local tokenMetadata_ accordingly
+ */
+ private void startLeaving()
+ {
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEAVING + Delimiter + getLocalToken().toString()));
+ tokenMetadata_.addLeavingEndPoint(FBUtilities.getLocalAddress());
+ calculatePendingRanges();
+ }
+
public void decommission() throws InterruptedException
{
if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
throw new UnsupportedOperationException("local node is not a
member of the token ring yet");
- if (tokenMetadata_.sortedTokens().size() < 2)
- throw new UnsupportedOperationException("no other nodes in the
ring; decommission would be pointless");
+ if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
+ throw new UnsupportedOperationException("no other normal nodes in
the ring; decommission would be pointless");
if
(tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
throw new UnsupportedOperationException("data is currently moving
to this node; unable to leave the ring");
logger_.info("DECOMMISSIONING");
- Gossiper.instance().addApplicationState(STATE_LEAVING, new
ApplicationState(getLocalToken().toString()));
+ startLeaving();
logger_.info("decommission sleeping " + Streaming.RING_DELAY);
Thread.sleep(Streaming.RING_DELAY);
@@ -1183,10 +1300,11 @@
{
SystemTable.setBootstrapped(false);
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+ calculatePendingRanges();
if (logger_.isDebugEnabled())
logger_.debug("");
- Gossiper.instance().addApplicationState(STATE_LEFT, new
ApplicationState(LEFT_NORMALLY + StateDelimiter + getLocalToken().toString()));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter +
getLocalToken().toString()));
try
{
Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1260,7 +1378,7 @@
throw new UnsupportedOperationException("data is currently moving
to this node; unable to leave the ring");
logger_.info("starting move. leaving token " + getLocalToken());
- Gossiper.instance().addApplicationState(STATE_LEAVING, new
ApplicationState(getLocalToken().toString()));
+ startLeaving();
logger_.info("move sleeping " + Streaming.RING_DELAY);
Thread.sleep(Streaming.RING_DELAY);
@@ -1303,6 +1421,7 @@
restoreReplicaCount(endPoint);
tokenMetadata_.removeEndpoint(endPoint);
+ calculatePendingRanges();
}
// This is not the cleanest way as we're adding STATE_LEFT for
@@ -1312,7 +1431,7 @@
// not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
// between removetoken command and normal state left, so it is
// not so bad.
- Gossiper.instance().addApplicationState(STATE_LEFT, new
ApplicationState(REMOVE_TOKEN + StateDelimiter + token.toString()));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new
ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter +
token.toString()));
}
public WriteResponseHandler getWriteResponseHandler(int blockFor, int
consistency_level)
Modified:
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=892885&r1=892884&r2=892885&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Mon Dec 21 17:14:00 2009
@@ -62,7 +62,7 @@
Range range3 = ss.getPrimaryRangeForEndPoint(three);
Token fakeToken =
((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(),
range3.right());
assert range3.contains(fakeToken);
- ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new
ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+ ss.onChange(myEndpoint, StorageService.MOVE_STATE, new
ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter
+ ss.getPartitioner().getTokenFactory().toString(fakeToken)));
tmd = ss.getTokenMetadata();
InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);