Author: jbellis
Date: Tue Oct 27 14:40:18 2009
New Revision: 830212
URL: http://svn.apache.org/viewvc?rev=830212&view=rev
Log:
fix the bootstrap interaction with gossip; there were two main problems:
1) token and bootstrap state are not guaranteed to be gossiped together; since
we only updated TMD.bootstrapNodes on an update of the token, this means we
could actually miss the bootstrap notice
2) deletions of state are not actually supported by Gossiper; there is no
concept of that at the protocol level. so if we delete state locally it will
never get gossiped. Instead, we have a MODE that is either MOVING or NORMAL,
corresponding to bootstrap & normal operation.
patch by jbellis; reviewed by goffinet for CASSANDRA-483
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
Tue Oct 27 14:40:18 2009
@@ -36,6 +36,8 @@
import org.apache.thrift.transport.TSocket;
import flexjson.JSONTokener;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
/**
* A class for caching the ring map at the client. For usage example, see
@@ -72,8 +74,7 @@
Map<String,String> tokenToHostMap = (Map<String,String>) new
JSONTokener(client.get_string_property(CassandraServer.TOKEN_MAP)).nextValue();
- HashMap<Token, InetAddress> tokenEndpointMap = new
HashMap<Token, InetAddress>();
- Map<InetAddress, Token> endpointTokenMap = new
HashMap<InetAddress, Token>();
+ BiMap<Token, InetAddress> tokenEndpointMap =
HashBiMap.create();
for (Map.Entry<String,String> entry :
tokenToHostMap.entrySet())
{
Token token =
StorageService.getPartitioner().getTokenFactory().fromString(entry.getKey());
@@ -81,7 +82,6 @@
try
{
tokenEndpointMap.put(token,
InetAddress.getByName(host));
- endpointTokenMap.put(InetAddress.getByName(host),
token);
}
catch (UnknownHostException e)
{
@@ -89,7 +89,7 @@
}
}
- TokenMetadata tokenMetadata = new
TokenMetadata(tokenEndpointMap, endpointTokenMap, null);
+ TokenMetadata tokenMetadata = new
TokenMetadata(tokenEndpointMap);
Class cls =
DatabaseDescriptor.getReplicaPlacementStrategyClass();
Class [] parameterTypes = new Class[] { TokenMetadata.class,
IPartitioner.class, int.class, int.class};
try
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
Tue Oct 27 14:40:18 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.io.IOError;
import org.apache.log4j.Logger;
@@ -60,20 +61,27 @@
/**
* Record token being used by another node
*/
- public static synchronized void updateToken(InetAddress ep, Token token)
throws IOException
+ public static synchronized void updateToken(InetAddress ep, Token token)
{
IPartitioner p = StorageService.getPartitioner();
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, STATUS_CF);
cf.addColumn(new Column(ep.getAddress(),
p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
rm.add(cf);
- rm.apply();
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
/**
* This method is used to update the System Table with the new token for
this node
*/
- public static synchronized void updateToken(Token token) throws IOException
+ public static synchronized void updateToken(Token token)
{
assert metadata != null;
IPartitioner p = StorageService.getPartitioner();
@@ -81,7 +89,14 @@
cf.addColumn(new Column(SystemTable.TOKEN,
p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
rm.add(cf);
- rm.apply();
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
metadata.setToken(token);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
Tue Oct 27 14:40:18 2009
@@ -182,7 +182,7 @@
{
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
logger.info("Setting token to " + t + " to assume load from "
+ maxEndpoint);
- ss.updateToken(t);
+ ss.setToken(t);
}
}
@@ -193,7 +193,7 @@
// Mark as not bootstrapping to calculate ranges correctly
for (int i=0; i< targets.size(); i++)
{
- tokenMetadata.update(tokens[i], targets.get(i), false);
+ tokenMetadata.setBootstrapping(targets.get(i), false);
}
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget
= getRangesWithSourceTarget();
@@ -210,7 +210,7 @@
}
}
}).start();
- Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE,
new ApplicationState(""));
+ Gossiper.instance().addApplicationState(StorageService.MODE, new
ApplicationState(StorageService.MODE_MOVING));
}
public static class BootstrapTokenVerbHandler implements IVerbHandler
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
Tue Oct 27 14:40:18 2009
@@ -86,11 +86,6 @@
applicationState_.put(key, appState);
}
- void deleteApplicationState(String key)
- {
- applicationState_.remove(key);
- }
-
/* getters and setters */
long getUpdateTimestamp()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
Tue Oct 27 14:40:18 2009
@@ -492,6 +492,8 @@
synchronized EndPointState getStateForVersionBiggerThan(InetAddress
forEndpoint, int version)
{
+ if (logger_.isTraceEnabled())
+ logger_.trace("Scanning for state greater than " + version + " for
" + forEndpoint);
EndPointState epState = endPointStateMap_.get(forEndpoint);
EndPointState reqdEndPointState = null;
@@ -923,12 +925,6 @@
epState.addApplicationState(key, appState);
}
}
-
- public synchronized void deleteApplicationState(String key)
- {
- EndPointState epState = endPointStateMap_.get(localEndPoint_);
- epState.deleteApplicationState(key);
- }
public void stop()
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
Tue Oct 27 14:40:18 2009
@@ -27,63 +27,58 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.service.UnavailableException;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
- private Map<Token, InetAddress> tokenToEndPointMap;
- /* Maintains a reverse index of endpoint to token in the cluster. */
- private Map<InetAddress, Token> endPointToTokenMap;
+ private BiMap<Token, InetAddress> tokenToEndPointMap;
/* Bootstrapping nodes and their tokens */
- private Map<Token, InetAddress> bootstrapNodes;
+ private Set<InetAddress> bootstrapping;
+ private BiMap<Token, InetAddress> bootstrapTokenMap;
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public TokenMetadata()
{
- tokenToEndPointMap = new HashMap<Token, InetAddress>();
- endPointToTokenMap = new HashMap<InetAddress, Token>();
- this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token,
InetAddress>());
+ this(null, null);
}
- public TokenMetadata(Map<Token, InetAddress> tokenToEndPointMap,
Map<InetAddress, Token> endPointToTokenMap, Map<Token, InetAddress>
bootstrapNodes)
+ public TokenMetadata(BiMap<Token, InetAddress> tokenToEndPointMap,
BiMap<Token, InetAddress> bootstrapTokenMap)
{
+ bootstrapping = new NonBlockingHashSet<InetAddress>();
+ if (tokenToEndPointMap == null)
+ tokenToEndPointMap = HashBiMap.create();
+ if (bootstrapTokenMap == null)
+ bootstrapTokenMap = HashBiMap.create();
this.tokenToEndPointMap = tokenToEndPointMap;
- this.endPointToTokenMap = endPointToTokenMap;
- this.bootstrapNodes = bootstrapNodes;
+ this.bootstrapTokenMap = bootstrapTokenMap;
}
-
- public TokenMetadata cloneMe()
- {
- return new TokenMetadata(cloneTokenEndPointMap(),
cloneEndPointTokenMap(), cloneBootstrapNodes());
- }
-
- public void update(Token token, InetAddress endpoint)
+
+ public TokenMetadata(BiMap<Token, InetAddress> tokenEndpointMap)
{
- this.update(token, endpoint, false);
+ this(tokenEndpointMap, null);
}
- /**
- * Update the two maps in an safe mode.
- */
- public void update(Token token, InetAddress endpoint, boolean
bootstrapState)
+
+ public void setBootstrapping(InetAddress endpoint, boolean isBootstrapping)
{
+ if (isBootstrapping)
+ bootstrapping.add(endpoint);
+ else
+ bootstrapping.remove(endpoint);
+
lock.writeLock().lock();
try
{
- if (bootstrapState)
- {
- bootstrapNodes.put(token, endpoint);
- this.remove(endpoint);
- }
- else
+ BiMap<Token, InetAddress> otherMap =
bootstrapping.contains(endpoint) ? tokenToEndPointMap : bootstrapTokenMap;
+ Token t = otherMap.inverse().get(endpoint);
+ if (t != null)
{
- bootstrapNodes.remove(token); // If this happened to be there
- Token oldToken = endPointToTokenMap.get(endpoint);
- if ( oldToken != null )
- tokenToEndPointMap.remove(oldToken);
- tokenToEndPointMap.put(token, endpoint);
- endPointToTokenMap.put(endpoint, token);
+ Map<Token, InetAddress> map = bootstrapping.contains(endpoint)
? bootstrapTokenMap : tokenToEndPointMap;
+ map.put(t, endpoint);
}
}
finally
@@ -91,33 +86,37 @@
lock.writeLock().unlock();
}
}
-
+
/**
- * Remove the entries in the two maps.
- * @param endpoint
- */
- public void remove(InetAddress endpoint)
+ * Update the two maps in an safe mode.
+ */
+ public void update(Token token, InetAddress endpoint)
{
+ assert token != null;
+ assert endpoint != null;
+
lock.writeLock().lock();
try
- {
- Token oldToken = endPointToTokenMap.get(endpoint);
- if ( oldToken != null )
- tokenToEndPointMap.remove(oldToken);
- endPointToTokenMap.remove(endpoint);
+ {
+ Map<Token, InetAddress> map = bootstrapping.contains(endpoint) ?
bootstrapTokenMap : tokenToEndPointMap;
+ Map<Token, InetAddress> otherMap =
bootstrapping.contains(endpoint) ? tokenToEndPointMap : bootstrapTokenMap;
+ map.put(token, endpoint);
+ otherMap.remove(token);
}
finally
{
lock.writeLock().unlock();
}
}
-
+
public Token getToken(InetAddress endpoint)
{
+ assert endpoint != null;
+
lock.readLock().lock();
try
{
- return endPointToTokenMap.get(endpoint);
+ return tokenToEndPointMap.inverse().get(endpoint);
}
finally
{
@@ -125,12 +124,14 @@
}
}
- public boolean isKnownEndPoint(InetAddress ep)
+ public boolean isKnownEndPoint(InetAddress endpoint)
{
+ assert endpoint != null;
+
lock.readLock().lock();
try
{
- return endPointToTokenMap.containsKey(ep);
+ return tokenToEndPointMap.inverse().containsKey(endpoint);
}
finally
{
@@ -156,8 +157,10 @@
}
- public InetAddress getNextEndpoint(InetAddress endPoint) throws
UnavailableException
+ public InetAddress getNextEndpoint(InetAddress endpoint) throws
UnavailableException
{
+ assert endpoint != null;
+
lock.readLock().lock();
try
{
@@ -165,7 +168,7 @@
if (tokens.isEmpty())
return null;
Collections.sort(tokens);
- int i = tokens.indexOf(endPointToTokenMap.get(endPoint)); // TODO
binary search
+ int i =
tokens.indexOf(tokenToEndPointMap.inverse().get(endpoint)); // TODO binary
search
int j = 1;
InetAddress ep;
while (!FailureDetector.instance().isAlive((ep =
tokenToEndPointMap.get(tokens.get((i + j) % tokens.size())))))
@@ -188,7 +191,7 @@
lock.readLock().lock();
try
{
- return new HashMap<Token, InetAddress>( bootstrapNodes );
+ return new HashMap<Token, InetAddress>(bootstrapTokenMap);
}
finally
{
@@ -221,7 +224,7 @@
lock.readLock().lock();
try
{
- return new HashMap<InetAddress, Token>(endPointToTokenMap);
+ return new HashMap<InetAddress,
Token>(tokenToEndPointMap.inverse());
}
finally
{
@@ -232,13 +235,13 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- Set<InetAddress> eps = endPointToTokenMap.keySet();
-
+ Set<InetAddress> eps = tokenToEndPointMap.inverse().keySet();
+
for ( InetAddress ep : eps )
{
sb.append(ep);
sb.append(":");
- sb.append(endPointToTokenMap.get(ep));
+ sb.append(tokenToEndPointMap.inverse().get(ep));
sb.append(System.getProperty("line.separator"));
}
@@ -257,4 +260,10 @@
lock.readLock().unlock();
}
}
+
+ public void clearUnsafe()
+ {
+ tokenToEndPointMap.clear();
+ bootstrapTokenMap.clear();
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Oct 27 14:40:18 2009
@@ -41,6 +41,7 @@
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
+import org.apache.commons.lang.StringUtils;
/*
* This abstraction contains the token/identifier of this node
@@ -51,9 +52,12 @@
public final class StorageService implements IEndPointStateChangeSubscriber,
StorageServiceMBean
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
- private final static String NODE_ID = "NODE-IDENTIFIER";
- public final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
-
+
+ private final static String NODE_ID = "NODE-ID";
+ public final static String MODE = "MODE";
+ public final static String MODE_MOVING = "move";
+ public final static String MODE_NORMAL = "run";
+
/* All stage identifiers */
public final static String mutationStage_ = "ROW-MUTATION-STAGE";
public final static String readStage_ = "ROW-READ-STAGE";
@@ -150,39 +154,40 @@
bootstrapSet.add(s);
}
- public synchronized boolean removeBootstrapSource(InetAddress s)
+ public synchronized void removeBootstrapSource(InetAddress s)
{
bootstrapSet.remove(s);
-
if (logger_.isDebugEnabled())
logger_.debug("Removed " + s + " as a bootstrap source");
+
if (bootstrapSet.isEmpty())
{
SystemTable.setBootstrapped();
- isBootstrapMode = false;
- updateTokenMetadata(storageMetadata_.getToken(),
FBUtilities.getLocalAddress(), false);
-
+ Gossiper.instance().addApplicationState(MODE, new
ApplicationState(MODE_NORMAL));
logger_.info("Bootstrap completed! Now serving reads.");
- /* Tell others you're not bootstrapping anymore */
- Gossiper.instance().deleteApplicationState(BOOTSTRAP_MODE);
}
- return isBootstrapMode;
}
- private void updateTokenMetadata(Token token, InetAddress endpoint,
boolean isBootstraping)
+ private void updateForeignToken(Token token, InetAddress endpoint)
{
- tokenMetadata_.update(token, endpoint, isBootstraping);
- if (!isBootstraping)
- {
- try
- {
- SystemTable.updateToken(endpoint, token);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
+ tokenMetadata_.update(token, endpoint);
+ SystemTable.updateToken(endpoint, token);
+ }
+
+ /** This method updates the local token on disk and starts broacasting it
to others. */
+ public void setToken(Token token)
+ {
+ SystemTable.updateToken(token);
+ tokenMetadata_.update(token, FBUtilities.getLocalAddress());
+ }
+
+ public void setAndBroadcastToken(Token token)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Setting token to " + token + " and gossiping it");
+ setToken(token);
+ ApplicationState state = new
ApplicationState(partitioner_.getTokenFactory().toString(token));
+ Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
}
public StorageService()
@@ -256,19 +261,17 @@
if (isBootstrapMode)
{
+ logger_.info("Starting in bootstrap mode (first, sleeping to get
load information)");
+ Gossiper.instance().addApplicationState(MODE, new
ApplicationState(MODE_MOVING));
new BootStrapper(Arrays.asList(FBUtilities.getLocalAddress()),
getLocalToken()).startBootstrap(); // handles token update
}
else
{
SystemTable.setBootstrapped();
- tokenMetadata_.update(storageMetadata_.getToken(),
FBUtilities.getLocalAddress(), isBootstrapMode);
}
+ setAndBroadcastToken(storageMetadata_.getToken());
- // Gossip my token.
- // note that before we do this we've (a) finalized what the token is
actually going to be, and
- // (b) added a bootstrap state (done by startBootstrap)
- ApplicationState state = new
ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
- Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
+ assert tokenMetadata_.cloneTokenEndPointMap().size() > 0;
}
public boolean isBootstrapMode()
@@ -278,7 +281,7 @@
public TokenMetadata getTokenMetadata()
{
- return tokenMetadata_.cloneMe();
+ return tokenMetadata_;
}
/* TODO: used for testing */
@@ -374,12 +377,16 @@
/* node identifier for this endpoint on the identifier space */
ApplicationState nodeIdState =
epState.getApplicationState(StorageService.NODE_ID);
/* Check if this has a bootstrapping state message */
- boolean bootstrapState =
epState.getApplicationState(StorageService.BOOTSTRAP_MODE) != null;
- if (bootstrapState)
+ ApplicationState modeState =
epState.getApplicationState(StorageService.MODE);
+ if (modeState != null)
{
+ String mode = modeState.getState();
if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " is in bootstrap state.");
+ logger_.debug(endpoint + " is in " + mode + " mode");
+ boolean bootstrapState = mode.equals(MODE_MOVING);
+ tokenMetadata_.setBootstrapping(endpoint, bootstrapState);
}
+
if (nodeIdState != null)
{
Token newToken =
getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
@@ -399,7 +406,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Relocation for endpoint " + endpoint);
- updateTokenMetadata(newToken, endpoint, bootstrapState);
+ updateForeignToken(newToken, endpoint);
}
else
{
@@ -417,7 +424,7 @@
/*
* This is a new node and we just update the token map.
*/
- updateTokenMetadata(newToken, endpoint, bootstrapState);
+ updateForeignToken(newToken, endpoint);
}
}
else
@@ -462,37 +469,6 @@
return map;
}
- /*
- * This method updates the token on disk and modifies the cached
- * StorageMetadata instance. This is only for the local endpoint.
- */
- public void updateToken(Token token) throws IOException
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Setting token to " + token);
- /* update the token on disk */
- SystemTable.updateToken(token);
- /* Update the token maps */
- tokenMetadata_.update(token, FBUtilities.getLocalAddress());
- /* Gossip this new token for the local storage instance */
- ApplicationState state = new
ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
- Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
- }
-
- /*
- * This method removes the state associated with this endpoint
- * from the TokenMetadata instance.
- *
- * @param endpoint remove the token state associated with this
- * endpoint.
- */
- public void removeTokenState(InetAddress endpoint)
- {
- tokenMetadata_.remove(endpoint);
- /* Remove the state from the Gossiper */
- Gossiper.instance().removeFromMembership(endpoint);
- }
-
/**
* Deliver hints to the specified node when it has crashed
* and come back up/ marked as alive after a network partition
@@ -705,6 +681,8 @@
*/
public Range[] getAllRanges(Set<Token> tokens)
{
+ if (logger_.isDebugEnabled())
+ logger_.debug("computing ranges for " + StringUtils.join(tokens,
", "));
List<Range> ranges = new ArrayList<Range>();
List<Token> allTokens = new ArrayList<Token>(tokens);
Collections.sort(allTokens);
@@ -895,4 +873,9 @@
{
return replicationStrategy_.getResponseHandler(responseResolver,
blockFor, consistency_level);
}
+
+ public AbstractReplicationStrategy getReplicationStrategy()
+ {
+ return replicationStrategy_;
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
Tue Oct 27 14:40:18 2009
@@ -38,10 +38,8 @@
bufIn.reset(body, body.length);
try
{
- /* Deserialize to get the token for this endpoint. */
Token token = Token.serializer().deserialize(bufIn);
- logger_.info("Updating the token to [" + token + "]");
- StorageService.instance().updateToken(token);
+ StorageService.instance().setAndBroadcastToken(token);
}
catch (IOException ex)
{
Modified:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=830212&r1=830211&r2=830212&view=diff
==============================================================================
---
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
(original)
+++
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Tue Oct 27 14:40:18 2009
@@ -118,7 +118,8 @@
//Add bootstrap node id=6
Token bsToken = new BigIntegerToken(String.valueOf(25));
InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
- tmd.update(bsToken, bootstrapEndPoint, true);
+ tmd.setBootstrapping(bootstrapEndPoint, true);
+ tmd.update(bsToken, bootstrapEndPoint);
for (int i = 0; i < keyTokens.length; i++)
{