Author: gdusbabek
Date: Tue Sep 28 05:50:26 2010
New Revision: 1002025
URL: http://svn.apache.org/viewvc?rev=1002025&view=rev
Log:
modify removetoken so that the coordinator relies on replicating nodes for
updates. patch by Nick Bailey, reviewed by Gary Dusbabek. CASSANDRA-1216
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
cassandra/trunk/test/unit/org/apache/cassandra/Util.java
cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Sep 28
05:50:26 2010
@@ -876,4 +876,21 @@ public class Gossiper implements IFailur
gossipTimer_.cancel();
gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
}
+
+ /**
+ * This should *only* be used for testing purposes.
+ */
+ public void initializeNodeUnsafe(InetAddress addr, int generationNbr) {
+ /* initialize the heartbeat state for this localEndpoint */
+ EndpointState localState = endpointStateMap_.get(addr);
+ if ( localState == null )
+ {
+ HeartBeatState hbState = new HeartBeatState(generationNbr);
+ localState = new EndpointState(hbState);
+ localState.isAlive(true);
+ localState.isAGossiper(true);
+ endpointStateMap_.put(addr, localState);
+ }
+ }
+
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Tue
Sep 28 05:50:26 2010
@@ -54,7 +54,8 @@ public class VersionedValue implements C
public final static String STATUS_LEAVING = "LEAVING";
public final static String STATUS_LEFT = "LEFT";
- public final static String REMOVE_TOKEN = "remove";
+ public final static String REMOVING_TOKEN = "removing";
+ public final static String REMOVED_TOKEN = "removed";
public final int version;
public final String value;
@@ -115,11 +116,19 @@ public class VersionedValue implements C
return new VersionedValue(VersionedValue.STATUS_LEFT +
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue removeNonlocal(Token localToken, Token token)
+ public VersionedValue removingNonlocal(Token localToken, Token token)
{
return new VersionedValue(VersionedValue.STATUS_NORMAL
+ VersionedValue.DELIMITER +
partitioner.getTokenFactory().toString(localToken)
- + VersionedValue.DELIMITER +
VersionedValue.REMOVE_TOKEN
+ + VersionedValue.DELIMITER +
VersionedValue.REMOVING_TOKEN
+ + VersionedValue.DELIMITER +
partitioner.getTokenFactory().toString(token));
+ }
+
+ public VersionedValue removedNonlocal(Token localToken, Token token)
+ {
+ return new VersionedValue(VersionedValue.STATUS_NORMAL
+ + VersionedValue.DELIMITER +
partitioner.getTokenFactory().toString(localToken)
+ + VersionedValue.DELIMITER +
VersionedValue.REMOVED_TOKEN
+ VersionedValue.DELIMITER +
partitioner.getTokenFactory().toString(token));
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue
Sep 28 05:50:26 2010
@@ -298,7 +298,7 @@ public class MessagingService implements
}
// message sinks are a testing hook
- Message processedMessage =
SinkManager.processClientMessageSink(message);
+ Message processedMessage =
SinkManager.processClientMessageSink(message, to);
if (processedMessage == null)
{
return;
@@ -378,7 +378,7 @@ public class MessagingService implements
public static void receive(Message message)
{
- message = SinkManager.processServerMessageSink(message);
+ message = SinkManager.processServerMessageSink(message, null);
Runnable runnable = new MessageDeliveryTask(message);
ExecutorService stage =
StageManager.getStage(message.getMessageType());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
Tue Sep 28 05:50:26 2010
@@ -23,5 +23,5 @@ import org.apache.cassandra.net.Message;
public interface IMessageSink
{
- public Message handleMessage(Message message);
+ public Message handleMessage(Message message, InetAddress to);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue
Sep 28 05:50:26 2010
@@ -42,13 +42,13 @@ public class SinkManager
messageSinks_.clear();
}
- public static Message processClientMessageSink(Message message)
+ public static Message processClientMessageSink(Message message,
InetAddress to)
{
ListIterator<IMessageSink> li = messageSinks_.listIterator();
while ( li.hasNext() )
{
IMessageSink ms = li.next();
- message = ms.handleMessage(message);
+ message = ms.handleMessage(message, to);
if ( message == null )
{
return null;
@@ -57,13 +57,13 @@ public class SinkManager
return message;
}
- public static Message processServerMessageSink(Message message)
+ public static Message processServerMessageSink(Message message,
InetAddress to)
{
ListIterator<IMessageSink> li =
messageSinks_.listIterator(messageSinks_.size());
while ( li.hasPrevious() )
{
IMessageSink ms = li.previous();
- message = ms.handleMessage(message);
+ message = ms.handleMessage(message, to);
if ( message == null )
{
return null;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Sep 28 05:50:26 2010
@@ -52,7 +52,10 @@ import org.apache.cassandra.io.DeletionS
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseVerbHandler;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -103,6 +106,8 @@ public class StorageService implements I
TRUNCATE,
SCHEMA_CHECK,
INDEX_SCAN,
+ REPLICATION_FINISHED,
+ ;
// remember to add new verbs at the end, since we serialize by ordinal
}
public static final Verb[] VERBS = Verb.values();
@@ -128,11 +133,12 @@ public class StorageService implements I
put(Verb.TRUNCATE, Stage.MUTATION);
put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
put(Verb.INDEX_SCAN, Stage.READ);
+ put(Verb.REPLICATION_FINISHED, Stage.MISC);
}};
private static IPartitioner partitioner_ =
DatabaseDescriptor.getPartitioner();
- public static final VersionedValue.VersionedValueFactory valueFactory =
new VersionedValue.VersionedValueFactory(partitioner_);
+ public static VersionedValue.VersionedValueFactory valueFactory = new
VersionedValue.VersionedValueFactory(partitioner_);
public static final StorageService instance = new StorageService();
@@ -165,6 +171,10 @@ public class StorageService implements I
/* We use this interface to determine where replicas need to be placed */
private Map<String, AbstractReplicationStrategy> replicationStrategies;
+ private Set<InetAddress> replicatingNodes;
+ private InetAddress removingNode;
+ private CountDownLatch replicateLatch;
+
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
/* when intialized as a client, we shouldn't write to the system table. */
@@ -215,6 +225,7 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN,
new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST,
new StreamRequestVerbHandler() );
MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new
StreamReplyVerbHandler());
+
MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new
ReplicationFinishedVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new
ResponseVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new
TreeRequestVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new
AntiEntropyService.TreeResponseVerbHandler());
@@ -637,31 +648,11 @@ public class StorageService implements I
tokenMetadata_.updateNormalToken(token, endpoint);
else
logger_.info("Will not change my token ownership to " + endpoint);
-
- if (pieces.length > 2)
- {
- if (VersionedValue.REMOVE_TOKEN.equals(pieces[2]))
- {
- // remove token was called on a dead node.
- Token tokenThatLeft =
getPartitioner().getTokenFactory().fromString(pieces[3]);
- InetAddress endpointThatLeft =
tokenMetadata_.getEndpoint(tokenThatLeft);
- // let's make sure that we're not removing ourselves. This can
happen when a node
- // enters ring as a replacement for a removed node.
removeToken for the old node is
- // still in gossip, so we will see it.
- if (FBUtilities.getLocalAddress().equals(endpointThatLeft))
- {
- logger_.info("Received removeToken gossip about myself. Is
this node a replacement for a removed one?");
- return;
- }
- logger_.debug("Token " + tokenThatLeft + " removed manually
(endpoint was " + ((endpointThatLeft == null) ? "unknown" : endpointThatLeft) +
")");
- if (endpointThatLeft != null)
- {
- removeEndpointLocally(endpointThatLeft);
- }
- tokenMetadata_.removeBootstrapToken(tokenThatLeft);
- }
+
+ if(pieces.length > 2) {
+ handleStateRemoving(endpoint, pieces);
}
-
+
calculatePendingRanges();
if (!isClientMode)
SystemTable.updateToken(endpoint, token);
@@ -734,15 +725,45 @@ public class StorageService implements I
}
/**
- * endpoint was completely removed from ring (as a result of removetoken
command). Remove it
- * from token metadata and gossip and restore replica count. Also delete
any hints for it.
+ * Handle node being actively removed from the ring.
+ *
+ * @param endpoint node
+ * @param moveValue (token to notify of removal)<Delimiter>(token to
remove)
*/
- private void removeEndpointLocally(InetAddress endpoint)
+ private void handleStateRemoving(InetAddress endpoint, String[] pieces)
{
- restoreReplicaCount(endpoint);
- Gossiper.instance.removeEndpoint(endpoint);
- // gossiper onRemove will take care of TokenMetadata
- HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+ assert pieces.length == 4;
+ Token removeToken =
getPartitioner().getTokenFactory().fromString(pieces[3]);
+ InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
+
+ if (removeEndpoint == null)
+ return;
+
+ if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
+ {
+ logger_.info("Received removeToken gossip about myself. Is this
node a replacement for a removed one?");
+ return;
+ }
+
+ if (VersionedValue.REMOVED_TOKEN.equals(pieces[2]))
+ {
+ Gossiper.instance.removeEndpoint(removeEndpoint);
+ tokenMetadata_.removeEndpoint(removeEndpoint);
+ HintedHandOffManager.deleteHintsForEndPoint(removeEndpoint);
+ tokenMetadata_.removeBootstrapToken(removeToken);
+ }
+ else if (VersionedValue.REMOVING_TOKEN.equals(pieces[2]))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + removeToken + " removed manually
(endpoint was " + removeEndpoint + ")");
+
+ // Note that the endpoint is being removed
+ tokenMetadata_.addLeavingEndpoint(removeEndpoint);
+ calculatePendingRanges();
+
+ // grab any data we are now responsible for and notify responsible
node
+ restoreReplicaCount(removeEndpoint, endpoint);
+ }
}
/**
@@ -832,71 +853,142 @@ public class StorageService implements I
}
/**
- * Called when an endpoint is removed from the ring without proper
- * STATE_LEAVING -> STATE_LEFT sequence. This function checks
+ * Determines the endpoints that are going to become responsible for data
due to
+ * a node leaving the cluster.
+ *
+ * @param endpoint the node that is leaving the cluster
+ * @return A set of endpoints
+ */
+ private Set<InetAddress> getNewEndpoints(InetAddress endpoint)
+ {
+ Set<InetAddress> newEndpoints = new HashSet<InetAddress>();
+
+ for (String table : DatabaseDescriptor.getNonSystemTables())
+ {
+ // get all ranges that change ownership (that is, a node needs
+ // to take responsibility for new range)
+ Multimap<Range, InetAddress> changedRanges =
getChangedRangesForLeaving(table, endpoint);
+ newEndpoints.addAll(changedRanges.values());
+ }
+ return newEndpoints;
+ }
+
+ /**
+ * Finds living endpoints responsible for the given ranges
+ *
+ * @param table the table ranges belong to
+ * @param ranges the ranges to find sources for
+ * @return multimap of addresses to ranges the address is responsible for
+ */
+ private Multimap<InetAddress, Range> getNewSourceRanges(String table,
Set<Range> ranges)
+ {
+ InetAddress myAddress = FBUtilities.getLocalAddress();
+ Multimap<Range, InetAddress> rangeAddresses =
getReplicationStrategy(table).getRangeAddresses(tokenMetadata_);
+ Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+ IFailureDetector failureDetector = FailureDetector.instance;
+
+ // find alive sources for our new ranges
+ for (Range range : ranges)
+ {
+ Collection<InetAddress> possibleRanges = rangeAddresses.get(range);
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ List<InetAddress> sources =
snitch.getSortedListByProximity(myAddress, possibleRanges);
+
+ assert (!sources.contains(myAddress));
+
+ for (InetAddress source : sources)
+ {
+ if (failureDetector.isAlive(source))
+ {
+ sourceRanges.put(source, range);
+ break;
+ }
+ }
+ }
+ return sourceRanges;
+ }
+
+ /**
+ * Sends a notification to a node indicating we have finished replicating
data.
+ *
+ * @param local the local address
+ * @param remote node to send notification to
+ */
+ private void sendReplicationNotification(InetAddress local, InetAddress
remote)
+ {
+ // notify the remote token
+ Message msg = new Message(local,
StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+ IFailureDetector failureDetector = FailureDetector.instance;
+ while (failureDetector.isAlive(remote))
+ {
+ IAsyncResult iar = MessagingService.instance.sendRR(msg, remote);
+ try
+ {
+ iar.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
+ return; // done
+ }
+ catch(TimeoutException e)
+ {
+ // try again
+ }
+ }
+ }
+
+ /**
+ * Called when an endpoint is removed from the ring. This function checks
* whether this node becomes responsible for new ranges as a
* consequence and streams data if needed.
*
* This is rather ineffective, but it does not matter so much
* since this is called very seldom
*
- * @param endpoint node that has left
+ * @param endpoint the node that left
*/
- private void restoreReplicaCount(InetAddress endpoint)
+ private void restoreReplicaCount(InetAddress endpoint, final InetAddress
notifyEndpoint)
{
- InetAddress myAddress = FBUtilities.getLocalAddress();
+ final Multimap<InetAddress, String> fetchSources =
HashMultimap.create();
+ Multimap<String, Map.Entry<InetAddress, Collection<Range>>>
rangesToFetch = HashMultimap.create();
+
+ final InetAddress myAddress = FBUtilities.getLocalAddress();
for (String table : DatabaseDescriptor.getNonSystemTables())
{
- // get all ranges that change ownership (that is, a node needs
- // to take responsibility for new range)
- Multimap<Range, InetAddress> changedRanges =
getChangedRangesForLeaving(table, endpoint);
-
- // check if any of these ranges are coming our way
+ Multimap<Range, InetAddress> changedRanges =
getChangedRangesForLeaving(table, endpoint);
Set<Range> myNewRanges = new HashSet<Range>();
for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
{
if (entry.getValue().equals(myAddress))
myNewRanges.add(entry.getKey());
}
-
- if (!myNewRanges.isEmpty())
+ Multimap<InetAddress, Range> sourceRanges =
getNewSourceRanges(table, myNewRanges);
+ for (Map.Entry<InetAddress, Collection<Range>> entry :
sourceRanges.asMap().entrySet())
{
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " was removed, my added ranges: "
+ StringUtils.join(myNewRanges, ", "));
-
- Multimap<Range, InetAddress> rangeAddresses =
getReplicationStrategy(table).getRangeAddresses(tokenMetadata_);
- Multimap<InetAddress, Range> sourceRanges =
HashMultimap.create();
- IFailureDetector failureDetector = FailureDetector.instance;
+ fetchSources.put(entry.getKey(), table);
+ rangesToFetch.put(table, entry);
+ }
+ }
- // find alive sources for our new ranges
- for (Range myNewRange : myNewRanges)
+ for (final String table : rangesToFetch.keySet())
+ {
+ for (Map.Entry<InetAddress, Collection<Range>> entry :
rangesToFetch.get(table))
+ {
+ final InetAddress source = entry.getKey();
+ Collection<Range> ranges = entry.getValue();
+ final Runnable callback = new Runnable()
{
- List<InetAddress> sources =
DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(myAddress,
rangeAddresses.get(myNewRange));
-
- assert (!sources.contains(myAddress));
-
- for (InetAddress source : sources)
+ public void run()
{
- if (source.equals(endpoint))
- continue;
-
- if (failureDetector.isAlive(source))
+ synchronized (fetchSources)
{
- sourceRanges.put(source, myNewRange);
- break;
+ fetchSources.remove(source, table);
+ if (fetchSources.isEmpty())
+ sendReplicationNotification(myAddress,
notifyEndpoint);
}
}
- }
-
- // Finally we have a list of addresses and ranges to
- // stream. Proceed to stream
- for (Map.Entry<InetAddress, Collection<Range>> entry :
sourceRanges.asMap().entrySet())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Requesting from " + entry.getKey() + "
ranges " + StringUtils.join(entry.getValue(), ", "));
- StreamIn.requestRanges(entry.getKey(), table,
entry.getValue());
- }
+ };
+ if (logger_.isDebugEnabled())
+ logger_.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
+ StreamIn.requestRanges(source, table, ranges, callback);
}
}
}
@@ -1583,31 +1675,102 @@ public class StorageService implements I
unbootstrap(finishMoving);
}
+ /**
+ * Get the status of a token removal.
+ */
+ public String getRemovalStatus()
+ {
+ if (removingNode == null) {
+ return "No token removals in process.";
+ }
+ return String.format("Removing token (%s). Waiting for replication
confirmation from [%s].",
+ tokenMetadata_.getToken(removingNode),
+ StringUtils.join(replicatingNodes, ","));
+ }
+
+ /**
+ * Force a remove operation to complete. This may be necessary if a remove
operation
+ * blocks forever due to node/stream failure.
+ */
+ public void finishRemoval()
+ {
+ while (replicateLatch != null && replicateLatch.getCount() > 0)
+ {
+ replicateLatch.countDown();
+ }
+ }
+
+ /**
+ * Remove a node that has died.
+ *
+ * @param tokenString token for the node
+ */
public void removeToken(String tokenString)
{
+ InetAddress myAddress = FBUtilities.getLocalAddress();
+ Token localToken = tokenMetadata_.getToken(myAddress);
Token token = partitioner_.getTokenFactory().fromString(tokenString);
-
- // Here we could refuse the operation from continuing if we
- // cannot find the endpoint for this token from metadata, but
- // that would prevent this command from being issued by a node
- // that has never seen the failed node.
InetAddress endpoint = tokenMetadata_.getEndpoint(token);
- if (endpoint != null)
- {
- if (endpoint.equals(FBUtilities.getLocalAddress()))
- throw new UnsupportedOperationException("Cannot remove node's
own token");
- // Let's make sure however that we're not removing a live
- // token (member)
- if (Gossiper.instance.getLiveMembers().contains(endpoint))
- throw new UnsupportedOperationException("Node " + endpoint + "
is alive and owns this token. Use decommission command to remove it from the
ring");
+ if (endpoint == null)
+ throw new UnsupportedOperationException("Token not found.");
- removeEndpointLocally(endpoint);
- calculatePendingRanges();
+ if (endpoint.equals(myAddress))
+ throw new UnsupportedOperationException("Cannot remove node's own
token");
+
+ if (Gossiper.instance.getLiveMembers().contains(endpoint))
+ throw new UnsupportedOperationException("Node " + endpoint + " is
alive and owns this token. Use decommission command to remove it from the
ring");
+
+ // A leaving endpoint that is dead is already being removed.
+ if (tokenMetadata_.isLeaving(endpoint))
+ throw new UnsupportedOperationException("Node " + endpoint + " is
already being removed.");
+
+ if (replicatingNodes != null || replicateLatch != null)
+ throw new UnsupportedOperationException("This node is already
processing a removal. Wait for it to complete.");
+
+ // Find the endpoints that are going to become responsible for data
+ replicatingNodes =
Collections.synchronizedSet(getNewEndpoints(endpoint));
+ replicateLatch = new CountDownLatch(replicatingNodes.size());
+ removingNode = endpoint;
+
+ tokenMetadata_.addLeavingEndpoint(endpoint);
+ calculatePendingRanges();
+ // bundle two states together. include this nodes state to keep the
status quo,
+ // but indicate the leaving token so that it can be dealt with.
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
valueFactory.removingNonlocal(localToken, token));
+
+ restoreReplicaCount(endpoint, myAddress);
+
+ try
+ {
+ replicateLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ logger_.error("Interrupted while waiting for replication
confirmation.");
}
- // bundle two states together. include this nodes state to keep the
status quo, but indicate the leaving token so that it can be dealt with.
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
valueFactory.removeNonlocal(getLocalToken(), token));
+ Gossiper.instance.removeEndpoint(endpoint);
+ tokenMetadata_.removeBootstrapToken(token);
+ tokenMetadata_.removeEndpoint(endpoint);
+ HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+
+ // indicate the token has left
+ calculatePendingRanges();
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
valueFactory.removedNonlocal(localToken, token));
+
+ if(!replicatingNodes.isEmpty())
+ logger_.error("Failed to recieve removal confirmation for " +
StringUtils.join(replicatingNodes, ","));
+
+ replicatingNodes = null;
+ removingNode = null;
+ replicateLatch = null;
+ }
+
+ public void confirmReplication(InetAddress node)
+ {
+ if(replicatingNodes != null && replicatingNodes.remove(node))
+ replicateLatch.countDown();
}
public boolean isClientMode()
@@ -1848,6 +2011,7 @@ public class StorageService implements I
{
IPartitioner oldPartitioner = partitioner_;
partitioner_ = newPartitioner;
+ valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
return oldPartitioner;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Tue Sep 28 05:50:26 2010
@@ -210,6 +210,16 @@ public interface StorageServiceMBean
*/
public void removeToken(String token);
+ /**
+ * Get the status of a token removal.
+ */
+ public String getRemovalStatus();
+
+ /**
+ * Force a remove operation to finish.
+ */
+ public void finishRemoval();
+
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
Added:
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1002025&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Tue Sep 28 05:50:26 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.streaming;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ReplicationFinishedVerbHandler implements IVerbHandler
+{
+ private static Logger logger =
LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
+
+ public void doVerb(Message msg)
+ {
+ StorageService.instance.confirmReplication(msg.getFrom());
+ Message response = msg.getReply(FBUtilities.getLocalAddress(), new
byte[]{});
+ if (logger.isDebugEnabled())
+ logger.debug("Replying to " + msg.getMessageId() + "@" +
msg.getFrom());
+ MessagingService.instance.sendOneWay(response, msg.getFrom());
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
Tue Sep 28 05:50:26 2010
@@ -99,4 +99,4 @@ public class StreamHeader
return new StreamHeader(table, sessionId, file, pendingFiles);
}
}
-}
+}
\ No newline at end of file
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
Tue Sep 28 05:50:26 2010
@@ -68,7 +68,6 @@ public class StreamInSession
public static StreamInSession get(InetAddress host, long sessionId)
{
Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
sessionId);
-
StreamInSession session = sessions.get(context);
if (session == null)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Sep 28
05:50:26 2010
@@ -77,7 +77,7 @@ public class NodeCmd {
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, version, cleanup, compact,
cfstats, snapshot [snapshotname], clearsnapshot, " +
- "tpstats, flush, drain, repair, decommission, move,
loadbalance, removetoken, " +
+ "tpstats, flush, drain, repair, decommission, move,
loadbalance, removetoken [status|force]|[token], " +
"setcachecapacity [keyspace] [cfname] [keycachecapacity]
[rowcachecapacity], " +
"getcompactionthreshold [keyspace] [cfname],
setcompactionthreshold [cfname] [minthreshold] [maxthreshold], " +
"streams [host]");
@@ -356,6 +356,11 @@ public class NodeCmd {
}
}
+ public void printRemovalStatus(PrintStream outs)
+ {
+ outs.println("RemovalStatus: " + probe.getRemovalStatus());
+ }
+
public static void main(String[] args) throws IOException,
InterruptedException, ParseException
{
CommandLineParser parser = new PosixParser();
@@ -458,9 +463,21 @@ public class NodeCmd {
{
if (arguments.length <= 1)
{
- System.err.println("missing token argument");
+ System.err.println("Missing an argument.");
+ printUsage();
}
- probe.removeToken(arguments[1]);
+ else if (arguments[1].equals("status"))
+ {
+ nodeCmd.printRemovalStatus(System.out);
+ }
+ else if (arguments[1].equals("force"))
+ {
+ nodeCmd.printRemovalStatus(System.out);
+ probe.finishRemoval();
+ }
+ else
+ probe.removeToken(arguments[1]);
+
}
else if (cmdName.equals("snapshot"))
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep
28 05:50:26 2010
@@ -296,6 +296,16 @@ public class NodeProbe
{
ssProxy.removeToken(token);
}
+
+ public String getRemovalStatus()
+ {
+ return ssProxy.getRemovalStatus();
+ }
+
+ public void finishRemoval()
+ {
+ ssProxy.finishRemoval();
+ }
public Iterator<Map.Entry<String, IExecutorMBean>>
getThreadPoolMBeanProxies()
{
Modified: cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/Util.java Tue Sep 28
05:50:26 2010
@@ -20,20 +20,30 @@ package org.apache.cassandra;
*
*/
-
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
import java.util.concurrent.ExecutionException;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.BigIntegerToken;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageService;
import static com.google.common.base.Charsets.UTF_8;
@@ -110,4 +120,29 @@ public class Util
{
return ColumnFamilyStore.removeDeleted(cf.cloneMe(), gcBefore);
}
+
+ /**
+ * Creates initial set of nodes and tokens. Nodes are added to
StorageService as 'normal'
+ */
+ public static void createInitialRing(StorageService ss, IPartitioner
partitioner, List<Token> endpointTokens,
+ List<Token> keyTokens, List<InetAddress>
hosts, int howMany)
+ throws UnknownHostException
+ {
+ for (int i=0; i<howMany; i++)
+ {
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
+ }
+
+ for (int i=0; i<endpointTokens.size(); i++)
+ {
+ InetAddress ep = InetAddress.getByName("127.0.0." +
String.valueOf(i + 1));
+ ss.onChange(ep, ApplicationState.STATUS, new
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i)));
+ hosts.add(ep);
+ }
+
+ // check that all nodes are in token metadata
+ for (int i=0; i<endpointTokens.size(); ++i)
+ assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
+ }
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Tue
Sep 28 05:50:26 2010
@@ -30,6 +30,7 @@ import static org.junit.Assert.*;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.ApplicationState;
@@ -62,7 +63,7 @@ public class MoveTest extends CleanupHel
ArrayList<Token> keyTokens = new ArrayList<Token>();
List<InetAddress> hosts = new ArrayList<InetAddress>();
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
RING_SIZE);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, RING_SIZE);
Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token,
List<InetAddress>>();
for (String table : DatabaseDescriptor.getNonSystemTables())
@@ -134,7 +135,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 10 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
RING_SIZE);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, RING_SIZE);
// nodes 6, 8 and 9 leave
final int[] LEAVING = new int[] {6, 8, 9};
@@ -421,7 +422,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
7);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 7);
// node 2 leaves
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.leaving(endpointTokens.get(2)));
@@ -488,7 +489,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
6);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 6);
// node 2 leaves
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.leaving(endpointTokens.get(2)));
@@ -530,7 +531,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring or 5 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
6);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 6);
// node 2 leaves with _different_ token
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.leaving(keyTokens.get(0)));
@@ -578,7 +579,7 @@ public class MoveTest extends CleanupHel
List<InetAddress> hosts = new ArrayList<InetAddress>();
// create a ring of 6 nodes
- createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts,
7);
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 7);
// node hosts.get(2) goes jumps to left
ss.onChange(hosts.get(2), ApplicationState.STATUS,
valueFactory.left(endpointTokens.get(2)));
@@ -602,31 +603,6 @@ public class MoveTest extends CleanupHel
ss.setPartitionerUnsafe(oldPartitioner);
}
- /**
- * Creates initial set of nodes and tokens. Nodes are added to
StorageService as 'normal'
- */
- private void createInitialRing(StorageService ss, IPartitioner
partitioner, List<Token> endpointTokens,
- List<Token> keyTokens, List<InetAddress>
hosts, int howMany)
- throws UnknownHostException
- {
- for (int i=0; i<howMany; i++)
- {
- endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
- keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
- }
-
- for (int i=0; i<endpointTokens.size(); i++)
- {
- InetAddress ep = InetAddress.getByName("127.0.0." +
String.valueOf(i + 1));
- ss.onChange(ep, ApplicationState.STATUS, new
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i)));
- hosts.add(ep);
- }
-
- // check that all nodes are in token metadata
- for (int i=0; i<endpointTokens.size(); ++i)
- assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
- }
-
private static Collection<InetAddress> makeAddrs(String... hosts) throws
UnknownHostException
{
ArrayList<InetAddress> addrs = new
ArrayList<InetAddress>(hosts.length);
Added: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1002025&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Tue
Sep 28 05:50:26 2010
@@ -0,0 +1,217 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.streaming.StreamUtil;
+import org.apache.cassandra.streaming.StreamRequestVerbHandler;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class RemoveTest extends CleanupHelper
+{
+ StorageService ss = StorageService.instance;
+ TokenMetadata tmd = ss.getTokenMetadata();
+ IPartitioner oldPartitioner;
+ ArrayList<Token> endpointTokens;
+ ArrayList<Token> keyTokens;
+ List<InetAddress> hosts;
+
+ @Before
+ public void setup() throws IOException {
+ tmd.clearUnsafe();
+ IPartitioner partitioner = new RandomPartitioner();
+
+ oldPartitioner = ss.setPartitionerUnsafe(partitioner);
+
+ endpointTokens = new ArrayList<Token>();
+ keyTokens = new ArrayList<Token>();
+ hosts = new ArrayList<InetAddress>();
+
+ // create a ring of 5 nodes
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens,
hosts, 6);
+
+ MessagingService.instance.listen(FBUtilities.getLocalAddress());
+ Gossiper.instance.start(FBUtilities.getLocalAddress(), 1);
+ for(int i = 0; i < 6; i++) {
+ Gossiper.instance.initializeNodeUnsafe(hosts.get(i), 1);
+ }
+ }
+
+ @After
+ public void tearDown() {
+ SinkManager.clearSinks();
+ MessagingService.instance.shutdown();
+ ss.setPartitionerUnsafe(oldPartitioner);
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ public void testBadToken() {
+ final String token =
ss.getPartitioner().getTokenFactory().toString(keyTokens.get(2));
+ ss.removeToken(token);
+
+ }
+
+ @Test(expected=UnsupportedOperationException.class)
+ public void testLocalToken() {
+ //first token should be localhost
+ final String token =
ss.getPartitioner().getTokenFactory().toString(endpointTokens.get(0));
+ ss.removeToken(token);
+ }
+
+ @Test
+ public void testRemoveToken() throws InterruptedException
+ {
+ IPartitioner partitioner = ss.getPartitioner();
+
+ final String token =
partitioner.getTokenFactory().toString(endpointTokens.get(5));
+ ReplicationSink rSink = new ReplicationSink();
+ SinkManager.addMessageSink(rSink);
+
+ // start removal in background and send replication confirmations
+ final AtomicBoolean success = new AtomicBoolean(false);
+ Thread remover = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ss.removeToken(token);
+ }
+ catch (Exception e )
+ {
+ System.err.println(e);
+ e.printStackTrace();
+ return;
+ }
+ success.set(true);
+ }
+ };
+ remover.start();
+
+ Thread.sleep(1000); // make sure removal is waiting for confirmation
+
+ assertTrue(tmd.isLeaving(hosts.get(5)));
+ assertEquals(1, tmd.getLeavingEndpoints().size());
+
+ for(InetAddress host : hosts) {
+ Message msg = new Message(host,
StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+ MessagingService.instance.sendRR(msg,
FBUtilities.getLocalAddress());
+ }
+
+ remover.join();
+
+ assertTrue(success.get());
+ assertTrue(tmd.getLeavingEndpoints().isEmpty());
+ }
+
+ @Test
+ public void testStartRemoving()
+ {
+ IPartitioner partitioner = ss.getPartitioner();
+ VersionedValue.VersionedValueFactory valueFactory = new
VersionedValue.VersionedValueFactory(partitioner);
+
+ NotificationSink nSink = new NotificationSink();
+ ReplicationSink rSink = new ReplicationSink();
+ SinkManager.addMessageSink(nSink);
+ SinkManager.addMessageSink(rSink);
+
+ assertEquals(0, tmd.getLeavingEndpoints().size());
+
+ ss.onChange(hosts.get(1),
+ ApplicationState.STATUS,
+ valueFactory.removingNonlocal(endpointTokens.get(1),
endpointTokens.get(5)));
+
+ assertEquals(1, nSink.callCount);
+ assertTrue(tmd.isLeaving(hosts.get(5)));
+ assertEquals(1, tmd.getLeavingEndpoints().size());
+ }
+
+ @Test
+ public void testFinishRemoving()
+ {
+ IPartitioner partitioner = ss.getPartitioner();
+ VersionedValue.VersionedValueFactory valueFactory = new
VersionedValue.VersionedValueFactory(partitioner);
+
+ assertEquals(0, tmd.getLeavingEndpoints().size());
+
+ ss.onChange(hosts.get(1),
+ ApplicationState.STATUS,
+ valueFactory.removedNonlocal(endpointTokens.get(1),
endpointTokens.get(5)));
+
+ assertFalse(Gossiper.instance.getLiveMembers().contains(hosts.get(5)));
+ assertFalse(tmd.isMember(hosts.get(5)));
+ }
+
+ class ReplicationSink implements IMessageSink {
+
+ public Message handleMessage(Message msg, InetAddress to) {
+ if(!msg.getVerb().equals(StorageService.Verb.STREAM_REQUEST))
+ return msg;
+
+ StreamUtil.finishStreamRequest(msg, to);
+
+ return null;
+ }
+ }
+
+ class NotificationSink implements IMessageSink {
+ public int callCount = 0;
+
+ public Message handleMessage(Message msg, InetAddress to) {
+ if(msg.getVerb().equals(StorageService.Verb.REPLICATION_FINISHED))
+ {
+ callCount++;
+ assertEquals(Stage.MISC, msg.getMessageType());
+ // simulate a response from remote server
+ Message response = msg.getReply(FBUtilities.getLocalAddress(),
new byte[]{});
+ MessagingService.instance.sendOneWay(response,
FBUtilities.getLocalAddress());
+ return null;
+ }
+ else
+ {
+ return msg;
+ }
+ }
+ }
+}
Added: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java?rev=1002025&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
Tue Sep 28 05:50:26 2010
@@ -0,0 +1,54 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+public class StreamUtil
+{
+
+ /**
+ * Takes an stream request message and creates an empty status response.
Exists here because StreamRequestMessage
+ * is package protected.
+ */
+ static public void finishStreamRequest(Message msg, InetAddress to)
+ {
+ byte[] body = msg.getMessageBody();
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+ try
+ {
+ StreamRequestMessage srm =
StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn));
+ StreamInSession session = StreamInSession.get(to, srm.sessionId);
+ session.closeIfFinished();
+ }
+ catch (Exception e)
+ {
+ System.err.println(e);
+ e.printStackTrace();
+ }
+ }
+}