Author: gdusbabek
Date: Tue Sep 28 05:50:26 2010
New Revision: 1002025

URL: http://svn.apache.org/viewvc?rev=1002025&view=rev
Log:
modify removetoken so that the coordinator relies on replicating nodes for 
updates. patch by Nick Bailey, reviewed by Gary Dusbabek. CASSANDRA-1216

Added:
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
    cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/trunk/test/unit/org/apache/cassandra/Util.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Sep 28 
05:50:26 2010
@@ -876,4 +876,21 @@ public class Gossiper implements IFailur
         gossipTimer_.cancel();
         gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
     }
+
+    /**
+     * This should *only* be used for testing purposes.
+     */
+    public void initializeNodeUnsafe(InetAddress addr, int generationNbr) {
+        /* initialize the heartbeat state for this localEndpoint */
+        EndpointState localState = endpointStateMap_.get(addr);
+        if ( localState == null )
+        {
+            HeartBeatState hbState = new HeartBeatState(generationNbr);
+            localState = new EndpointState(hbState);
+            localState.isAlive(true);
+            localState.isAGossiper(true);
+            endpointStateMap_.put(addr, localState);
+        }
+    }
+
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Tue 
Sep 28 05:50:26 2010
@@ -54,7 +54,8 @@ public class VersionedValue implements C
     public final static String STATUS_LEAVING = "LEAVING";
     public final static String STATUS_LEFT = "LEFT";
 
-    public final static String REMOVE_TOKEN = "remove";
+    public final static String REMOVING_TOKEN = "removing";
+    public final static String REMOVED_TOKEN = "removed";
 
     public final int version;
     public final String value;
@@ -115,11 +116,19 @@ public class VersionedValue implements C
             return new VersionedValue(VersionedValue.STATUS_LEFT + 
VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removeNonlocal(Token localToken, Token token)
+        public VersionedValue removingNonlocal(Token localToken, Token token)
         {
             return new VersionedValue(VersionedValue.STATUS_NORMAL
                                         + VersionedValue.DELIMITER + 
partitioner.getTokenFactory().toString(localToken)
-                                        + VersionedValue.DELIMITER + 
VersionedValue.REMOVE_TOKEN
+                                        + VersionedValue.DELIMITER + 
VersionedValue.REMOVING_TOKEN
+                                        + VersionedValue.DELIMITER + 
partitioner.getTokenFactory().toString(token));
+        }
+
+        public VersionedValue removedNonlocal(Token localToken, Token token)
+        {
+            return new VersionedValue(VersionedValue.STATUS_NORMAL
+                                        + VersionedValue.DELIMITER + 
partitioner.getTokenFactory().toString(localToken)
+                                        + VersionedValue.DELIMITER + 
VersionedValue.REMOVED_TOKEN
                                         + VersionedValue.DELIMITER + 
partitioner.getTokenFactory().toString(token));
         }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue 
Sep 28 05:50:26 2010
@@ -298,7 +298,7 @@ public class MessagingService implements
         }
 
         // message sinks are a testing hook
-        Message processedMessage = 
SinkManager.processClientMessageSink(message);
+        Message processedMessage = 
SinkManager.processClientMessageSink(message, to);
         if (processedMessage == null)
         {
             return;
@@ -378,7 +378,7 @@ public class MessagingService implements
 
     public static void receive(Message message)
     {
-        message = SinkManager.processServerMessageSink(message);
+        message = SinkManager.processServerMessageSink(message, null);
 
         Runnable runnable = new MessageDeliveryTask(message);
         ExecutorService stage = 
StageManager.getStage(message.getMessageType());

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java 
Tue Sep 28 05:50:26 2010
@@ -23,5 +23,5 @@ import org.apache.cassandra.net.Message;
 
 public interface IMessageSink
 {
-    public Message handleMessage(Message message);    
+    public Message handleMessage(Message message, InetAddress to);
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Tue 
Sep 28 05:50:26 2010
@@ -42,13 +42,13 @@ public class SinkManager
         messageSinks_.clear();
     }
 
-    public static Message processClientMessageSink(Message message)
+    public static Message processClientMessageSink(Message message, 
InetAddress to)
     {
         ListIterator<IMessageSink> li = messageSinks_.listIterator();
         while ( li.hasNext() )
         {
             IMessageSink ms = li.next();
-            message = ms.handleMessage(message);
+            message = ms.handleMessage(message, to);
             if ( message == null )
             {
                 return null;
@@ -57,13 +57,13 @@ public class SinkManager
         return message;
     }
 
-    public static Message processServerMessageSink(Message message)
+    public static Message processServerMessageSink(Message message, 
InetAddress to)
     {
         ListIterator<IMessageSink> li = 
messageSinks_.listIterator(messageSinks_.size());
         while ( li.hasPrevious() )
         {
             IMessageSink ms = li.previous();
-            message = ms.handleMessage(message);
+            message = ms.handleMessage(message, to);
             if ( message == null )
             {
                 return null;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Tue Sep 28 05:50:26 2010
@@ -52,7 +52,10 @@ import org.apache.cassandra.io.DeletionS
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -103,6 +106,8 @@ public class StorageService implements I
         TRUNCATE,
         SCHEMA_CHECK,
         INDEX_SCAN,
+        REPLICATION_FINISHED,
+        ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
     public static final Verb[] VERBS = Verb.values();
@@ -128,11 +133,12 @@ public class StorageService implements I
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
+        put(Verb.REPLICATION_FINISHED, Stage.MISC);
     }};
 
 
     private static IPartitioner partitioner_ = 
DatabaseDescriptor.getPartitioner();
-    public static final VersionedValue.VersionedValueFactory valueFactory = 
new VersionedValue.VersionedValueFactory(partitioner_);
+    public static VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner_);
 
     public static final StorageService instance = new StorageService();
 
@@ -165,6 +171,10 @@ public class StorageService implements I
     /* We use this interface to determine where replicas need to be placed */
     private Map<String, AbstractReplicationStrategy> replicationStrategies;
 
+    private Set<InetAddress> replicatingNodes;
+    private InetAddress removingNode;
+    private CountDownLatch replicateLatch;
+
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
     /* when intialized as a client, we shouldn't write to the system table. */
@@ -215,6 +225,7 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, 
new BootStrapper.BootstrapTokenVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, 
new StreamRequestVerbHandler() );
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new 
StreamReplyVerbHandler());
+        
MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new 
ReplicationFinishedVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new 
ResponseVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new 
TreeRequestVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new 
AntiEntropyService.TreeResponseVerbHandler());
@@ -637,31 +648,11 @@ public class StorageService implements I
             tokenMetadata_.updateNormalToken(token, endpoint);
         else
             logger_.info("Will not change my token ownership to " + endpoint);
-        
-        if (pieces.length > 2)
-        {
-            if (VersionedValue.REMOVE_TOKEN.equals(pieces[2]))
-            { 
-                // remove token was called on a dead node.
-                Token tokenThatLeft = 
getPartitioner().getTokenFactory().fromString(pieces[3]);
-                InetAddress endpointThatLeft = 
tokenMetadata_.getEndpoint(tokenThatLeft);
-                // let's make sure that we're not removing ourselves. This can 
happen when a node
-                // enters ring as a replacement for a removed node. 
removeToken for the old node is
-                // still in gossip, so we will see it.
-                if (FBUtilities.getLocalAddress().equals(endpointThatLeft))
-                {
-                    logger_.info("Received removeToken gossip about myself. Is 
this node a replacement for a removed one?");
-                    return;
-                }
-                logger_.debug("Token " + tokenThatLeft + " removed manually 
(endpoint was " + ((endpointThatLeft == null) ? "unknown" : endpointThatLeft) + 
")");
-                if (endpointThatLeft != null)
-                {
-                    removeEndpointLocally(endpointThatLeft);
-                }
-                tokenMetadata_.removeBootstrapToken(tokenThatLeft);
-            }
+
+        if(pieces.length > 2) {
+            handleStateRemoving(endpoint, pieces);
         }
-        
+
         calculatePendingRanges();
         if (!isClientMode)
             SystemTable.updateToken(endpoint, token);
@@ -734,15 +725,45 @@ public class StorageService implements I
     }
 
     /**
-     * endpoint was completely removed from ring (as a result of removetoken 
command). Remove it
-     * from token metadata and gossip and restore replica count.  Also delete 
any hints for it.
+     * Handle node being actively removed from the ring.
+     *
+     * @param endpoint node
+     * @param moveValue (token to notify of removal)<Delimiter>(token to 
remove)
      */
-    private void removeEndpointLocally(InetAddress endpoint)
+    private void handleStateRemoving(InetAddress endpoint, String[] pieces)
     {
-        restoreReplicaCount(endpoint);
-        Gossiper.instance.removeEndpoint(endpoint);
-        // gossiper onRemove will take care of TokenMetadata
-        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+        assert pieces.length == 4;
+        Token removeToken = 
getPartitioner().getTokenFactory().fromString(pieces[3]);
+        InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
+        
+        if (removeEndpoint == null)
+            return;
+        
+        if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
+        {
+            logger_.info("Received removeToken gossip about myself. Is this 
node a replacement for a removed one?");
+            return;
+        }
+
+        if (VersionedValue.REMOVED_TOKEN.equals(pieces[2]))
+        {
+            Gossiper.instance.removeEndpoint(removeEndpoint);
+            tokenMetadata_.removeEndpoint(removeEndpoint);
+            HintedHandOffManager.deleteHintsForEndPoint(removeEndpoint);
+            tokenMetadata_.removeBootstrapToken(removeToken);
+        }
+        else if (VersionedValue.REMOVING_TOKEN.equals(pieces[2]))
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug("Token " + removeToken + " removed manually 
(endpoint was " + removeEndpoint + ")");
+
+            // Note that the endpoint is being removed
+            tokenMetadata_.addLeavingEndpoint(removeEndpoint);
+            calculatePendingRanges();
+
+            // grab any data we are now responsible for and notify responsible 
node
+            restoreReplicaCount(removeEndpoint, endpoint);
+        }
     }
 
     /**
@@ -832,71 +853,142 @@ public class StorageService implements I
     }
 
     /**
-     * Called when an endpoint is removed from the ring without proper
-     * STATE_LEAVING -> STATE_LEFT sequence. This function checks
+     * Determines the endpoints that are going to become responsible for data 
due to
+     * a node leaving the cluster.
+     *
+     * @param endpoint the node that is leaving the cluster
+     * @return A set of endpoints
+     */
+    private Set<InetAddress> getNewEndpoints(InetAddress endpoint)
+    {
+        Set<InetAddress> newEndpoints = new HashSet<InetAddress>();
+
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+        {
+            // get all ranges that change ownership (that is, a node needs
+            // to take responsibility for new range)
+            Multimap<Range, InetAddress> changedRanges = 
getChangedRangesForLeaving(table, endpoint);
+            newEndpoints.addAll(changedRanges.values());
+        }
+        return newEndpoints;
+    }
+
+    /**
+     * Finds living endpoints responsible for the given ranges
+     *
+     * @param table the table ranges belong to
+     * @param ranges the ranges to find sources for
+     * @return multimap of addresses to ranges the address is responsible for
+     */
+    private Multimap<InetAddress, Range> getNewSourceRanges(String table, 
Set<Range> ranges) 
+    {
+        InetAddress myAddress = FBUtilities.getLocalAddress();
+        Multimap<Range, InetAddress> rangeAddresses = 
getReplicationStrategy(table).getRangeAddresses(tokenMetadata_);
+        Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+        IFailureDetector failureDetector = FailureDetector.instance;
+
+        // find alive sources for our new ranges
+        for (Range range : ranges)
+        {
+            Collection<InetAddress> possibleRanges = rangeAddresses.get(range);
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            List<InetAddress> sources = 
snitch.getSortedListByProximity(myAddress, possibleRanges);
+
+            assert (!sources.contains(myAddress));
+
+            for (InetAddress source : sources)
+            {
+                if (failureDetector.isAlive(source))
+                {
+                    sourceRanges.put(source, range);
+                    break;
+                }
+            } 
+        }
+        return sourceRanges;
+    }
+
+    /**
+     * Sends a notification to a node indicating we have finished replicating 
data.
+     * 
+     * @param local the local address
+     * @param remote node to send notification to
+     */
+    private void sendReplicationNotification(InetAddress local, InetAddress 
remote)
+    {
+        // notify the remote token
+        Message msg = new Message(local, 
StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+        IFailureDetector failureDetector = FailureDetector.instance;
+        while (failureDetector.isAlive(remote))
+        {
+            IAsyncResult iar = MessagingService.instance.sendRR(msg, remote);
+            try 
+            {
+                iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+                return; // done
+            }
+            catch(TimeoutException e)
+            {
+                // try again
+            }
+        }
+    }
+
+    /**
+     * Called when an endpoint is removed from the ring. This function checks
      * whether this node becomes responsible for new ranges as a
      * consequence and streams data if needed.
      *
      * This is rather ineffective, but it does not matter so much
      * since this is called very seldom
      *
-     * @param endpoint node that has left
+     * @param endpoint the node that left
      */
-    private void restoreReplicaCount(InetAddress endpoint)
+    private void restoreReplicaCount(InetAddress endpoint, final InetAddress 
notifyEndpoint)
     {
-        InetAddress myAddress = FBUtilities.getLocalAddress();
+        final Multimap<InetAddress, String> fetchSources = 
HashMultimap.create();
+        Multimap<String, Map.Entry<InetAddress, Collection<Range>>> 
rangesToFetch = HashMultimap.create();
+
+        final InetAddress myAddress = FBUtilities.getLocalAddress();
 
         for (String table : DatabaseDescriptor.getNonSystemTables())
         {
-            // get all ranges that change ownership (that is, a node needs
-            // to take responsibility for new range)
-            Multimap<Range, InetAddress> changedRanges = 
getChangedRangesForLeaving(table, endpoint);
-
-            // check if any of these ranges are coming our way
+            Multimap<Range, InetAddress> changedRanges = 
getChangedRangesForLeaving(table, endpoint); 
             Set<Range> myNewRanges = new HashSet<Range>();
             for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
             {
                 if (entry.getValue().equals(myAddress))
                     myNewRanges.add(entry.getKey());
             }
-
-            if (!myNewRanges.isEmpty())
+            Multimap<InetAddress, Range> sourceRanges = 
getNewSourceRanges(table, myNewRanges);
+            for (Map.Entry<InetAddress, Collection<Range>> entry : 
sourceRanges.asMap().entrySet())
             {
-                if (logger_.isDebugEnabled())
-                    logger_.debug(endpoint + " was removed, my added ranges: " 
+ StringUtils.join(myNewRanges, ", "));
-
-                Multimap<Range, InetAddress> rangeAddresses = 
getReplicationStrategy(table).getRangeAddresses(tokenMetadata_);
-                Multimap<InetAddress, Range> sourceRanges = 
HashMultimap.create();
-                IFailureDetector failureDetector = FailureDetector.instance;
+                fetchSources.put(entry.getKey(), table);
+                rangesToFetch.put(table, entry);
+            }
+        }
 
-                // find alive sources for our new ranges
-                for (Range myNewRange : myNewRanges)
+        for (final String table : rangesToFetch.keySet())
+        {
+            for (Map.Entry<InetAddress, Collection<Range>> entry : 
rangesToFetch.get(table))
+            {
+                final InetAddress source = entry.getKey();
+                Collection<Range> ranges = entry.getValue();
+                final Runnable callback = new Runnable()
                 {
-                    List<InetAddress> sources = 
DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(myAddress, 
rangeAddresses.get(myNewRange));
-
-                    assert (!sources.contains(myAddress));
-
-                    for (InetAddress source : sources)
+                    public void run()
                     {
-                        if (source.equals(endpoint))
-                            continue;
-
-                        if (failureDetector.isAlive(source))
+                        synchronized (fetchSources)
                         {
-                            sourceRanges.put(source, myNewRange);
-                            break;
+                            fetchSources.remove(source, table);
+                            if (fetchSources.isEmpty())
+                                sendReplicationNotification(myAddress, 
notifyEndpoint);
                         }
                     }
-                }
-
-                // Finally we have a list of addresses and ranges to
-                // stream. Proceed to stream
-                for (Map.Entry<InetAddress, Collection<Range>> entry : 
sourceRanges.asMap().entrySet())
-                {
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Requesting from " + entry.getKey() + " 
ranges " + StringUtils.join(entry.getValue(), ", "));
-                    StreamIn.requestRanges(entry.getKey(), table, 
entry.getValue());
-                }
+                };
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Requesting from " + source + " ranges " + 
StringUtils.join(ranges, ", "));
+                StreamIn.requestRanges(source, table, ranges, callback);
             }
         }
     }
@@ -1583,31 +1675,102 @@ public class StorageService implements I
         unbootstrap(finishMoving);
     }
 
+    /**
+     * Get the status of a token removal.
+     */
+    public String getRemovalStatus()
+    {
+        if (removingNode == null) {
+            return "No token removals in process.";
+        }
+        return String.format("Removing token (%s). Waiting for replication 
confirmation from [%s].",
+                             tokenMetadata_.getToken(removingNode),
+                             StringUtils.join(replicatingNodes, ","));
+    }
+
+    /**
+     * Force a remove operation to complete. This may be necessary if a remove 
operation
+     * blocks forever due to node/stream failure.
+     */
+    public void finishRemoval()
+    {
+        while (replicateLatch != null && replicateLatch.getCount() > 0)
+        {
+            replicateLatch.countDown();
+        }
+    }
+
+    /**
+     * Remove a node that has died.
+     *
+     * @param tokenString token for the node
+     */
     public void removeToken(String tokenString)
     {
+        InetAddress myAddress = FBUtilities.getLocalAddress();
+        Token localToken = tokenMetadata_.getToken(myAddress);
         Token token = partitioner_.getTokenFactory().fromString(tokenString);
-
-        // Here we could refuse the operation from continuing if we
-        // cannot find the endpoint for this token from metadata, but
-        // that would prevent this command from being issued by a node
-        // that has never seen the failed node.
         InetAddress endpoint = tokenMetadata_.getEndpoint(token);
-        if (endpoint != null)
-        {
-            if (endpoint.equals(FBUtilities.getLocalAddress()))
-                throw new UnsupportedOperationException("Cannot remove node's 
own token");
 
-            // Let's make sure however that we're not removing a live
-            // token (member)
-            if (Gossiper.instance.getLiveMembers().contains(endpoint))
-                throw new UnsupportedOperationException("Node " + endpoint + " 
is alive and owns this token. Use decommission command to remove it from the 
ring");
+        if (endpoint == null)
+            throw new UnsupportedOperationException("Token not found.");
 
-            removeEndpointLocally(endpoint);
-            calculatePendingRanges();
+        if (endpoint.equals(myAddress))
+             throw new UnsupportedOperationException("Cannot remove node's own 
token");
+
+        if (Gossiper.instance.getLiveMembers().contains(endpoint))
+            throw new UnsupportedOperationException("Node " + endpoint + " is 
alive and owns this token. Use decommission command to remove it from the 
ring");
+
+        // A leaving endpoint that is dead is already being removed.
+        if (tokenMetadata_.isLeaving(endpoint)) 
+            throw new UnsupportedOperationException("Node " + endpoint + " is 
already being removed.");
+
+        if (replicatingNodes != null || replicateLatch != null)
+            throw new UnsupportedOperationException("This node is already 
processing a removal. Wait for it to complete.");
+
+        // Find the endpoints that are going to become responsible for data
+        replicatingNodes = 
Collections.synchronizedSet(getNewEndpoints(endpoint));
+        replicateLatch = new CountDownLatch(replicatingNodes.size());
+        removingNode = endpoint;
+
+        tokenMetadata_.addLeavingEndpoint(endpoint);
+        calculatePendingRanges();
+        // bundle two states together. include this nodes state to keep the 
status quo, 
+        // but indicate the leaving token so that it can be dealt with.
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.removingNonlocal(localToken, token));
+
+        restoreReplicaCount(endpoint, myAddress);
+
+        try
+        {
+            replicateLatch.await();
+        }
+        catch (InterruptedException e)
+        {
+            logger_.error("Interrupted while waiting for replication 
confirmation.");
         }
 
-        // bundle two states together. include this nodes state to keep the 
status quo, but indicate the leaving token so that it can be dealt with.
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.removeNonlocal(getLocalToken(), token));
+        Gossiper.instance.removeEndpoint(endpoint);
+        tokenMetadata_.removeBootstrapToken(token);
+        tokenMetadata_.removeEndpoint(endpoint);
+        HintedHandOffManager.deleteHintsForEndPoint(endpoint);
+
+        // indicate the token has left
+        calculatePendingRanges();
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, 
valueFactory.removedNonlocal(localToken, token));
+
+        if(!replicatingNodes.isEmpty())
+            logger_.error("Failed to recieve removal confirmation for " + 
StringUtils.join(replicatingNodes, ","));
+
+        replicatingNodes = null;
+        removingNode = null;
+        replicateLatch = null;
+    }
+
+    public void confirmReplication(InetAddress node)
+    {
+        if(replicatingNodes != null && replicatingNodes.remove(node))
+            replicateLatch.countDown();
     }
 
     public boolean isClientMode()
@@ -1848,6 +2011,7 @@ public class StorageService implements I
     {
         IPartitioner oldPartitioner = partitioner_;
         partitioner_ = newPartitioner;
+        valueFactory = new VersionedValue.VersionedValueFactory(partitioner_);
         return oldPartitioner;
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
Tue Sep 28 05:50:26 2010
@@ -210,6 +210,16 @@ public interface StorageServiceMBean
      */
     public void removeToken(String token);
 
+    /**
+     * Get the status of a token removal.
+     */
+    public String getRemovalStatus();
+
+    /**
+     * Force a remove operation to finish.
+     */
+    public void finishRemoval();
+
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);
 

Added: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1002025&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
 Tue Sep 28 05:50:26 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.streaming;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ReplicationFinishedVerbHandler implements IVerbHandler
+{
+    private static Logger logger = 
LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
+
+    public void doVerb(Message msg)
+    {
+        StorageService.instance.confirmReplication(msg.getFrom());
+        Message response = msg.getReply(FBUtilities.getLocalAddress(), new 
byte[]{});
+        if (logger.isDebugEnabled())
+            logger.debug("Replying to " + msg.getMessageId() + "@" + 
msg.getFrom());
+        MessagingService.instance.sendOneWay(response, msg.getFrom());
+    }
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamHeader.java 
Tue Sep 28 05:50:26 2010
@@ -99,4 +99,4 @@ public class StreamHeader
             return new StreamHeader(table, sessionId, file, pendingFiles);
         }
     }
-}
+}
\ No newline at end of file

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java 
Tue Sep 28 05:50:26 2010
@@ -68,7 +68,6 @@ public class StreamInSession
     public static StreamInSession get(InetAddress host, long sessionId)
     {
         Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, 
sessionId);
-
         StreamInSession session = sessions.get(context);
         if (session == null)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Tue Sep 28 
05:50:26 2010
@@ -77,7 +77,7 @@ public class NodeCmd {
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, version, cleanup, compact, 
cfstats, snapshot [snapshotname], clearsnapshot, " +
-                "tpstats, flush, drain, repair, decommission, move, 
loadbalance, removetoken, " +
+                "tpstats, flush, drain, repair, decommission, move, 
loadbalance, removetoken [status|force]|[token], " +
                 "setcachecapacity [keyspace] [cfname] [keycachecapacity] 
[rowcachecapacity], " +
                 "getcompactionthreshold [keyspace] [cfname], 
setcompactionthreshold [cfname] [minthreshold] [maxthreshold], " +
                 "streams [host]");
@@ -356,6 +356,11 @@ public class NodeCmd {
         }
     }
     
+    public void printRemovalStatus(PrintStream outs)
+    {
+        outs.println("RemovalStatus: " + probe.getRemovalStatus());
+    }
+
     public static void main(String[] args) throws IOException, 
InterruptedException, ParseException
     {
         CommandLineParser parser = new PosixParser();
@@ -458,9 +463,21 @@ public class NodeCmd {
         {
             if (arguments.length <= 1)
             {
-                System.err.println("missing token argument");
+                System.err.println("Missing an argument.");
+                printUsage();
             }
-            probe.removeToken(arguments[1]);
+            else if (arguments[1].equals("status"))
+            {
+                nodeCmd.printRemovalStatus(System.out);
+            }
+            else if (arguments[1].equals("force"))
+            {
+                nodeCmd.printRemovalStatus(System.out);
+                probe.finishRemoval();
+            }
+            else
+                probe.removeToken(arguments[1]);
+
         }
         else if (cmdName.equals("snapshot"))
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Tue Sep 
28 05:50:26 2010
@@ -296,6 +296,16 @@ public class NodeProbe
     {
         ssProxy.removeToken(token);
     }
+
+    public String getRemovalStatus()
+    {
+        return ssProxy.getRemovalStatus();
+    }
+
+    public void finishRemoval()
+    {
+        ssProxy.finishRemoval();
+    }
   
     public Iterator<Map.Entry<String, IExecutorMBean>> 
getThreadPoolMBeanProxies()
     {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/Util.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/Util.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/Util.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/Util.java Tue Sep 28 
05:50:26 2010
@@ -20,20 +20,30 @@ package org.apache.cassandra;
  * 
  */
 
-
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ExecutionException;
 
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.BigIntegerToken;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 
 import static com.google.common.base.Charsets.UTF_8;
@@ -110,4 +120,29 @@ public class Util
     {
         return ColumnFamilyStore.removeDeleted(cf.cloneMe(), gcBefore);
     }
+
+    /**
+     * Creates initial set of nodes and tokens. Nodes are added to 
StorageService as 'normal'
+     */
+    public static void createInitialRing(StorageService ss, IPartitioner 
partitioner, List<Token> endpointTokens,
+                                   List<Token> keyTokens, List<InetAddress> 
hosts, int howMany)
+        throws UnknownHostException
+    {
+        for (int i=0; i<howMany; i++)
+        {
+            endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+            keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
+        }
+
+        for (int i=0; i<endpointTokens.size(); i++)
+        {
+            InetAddress ep = InetAddress.getByName("127.0.0." + 
String.valueOf(i + 1));
+            ss.onChange(ep, ApplicationState.STATUS, new 
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i)));
+            hosts.add(ep);
+        }
+
+        // check that all nodes are in token metadata
+        for (int i=0; i<endpointTokens.size(); ++i)
+            assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=1002025&r1=1002024&r2=1002025&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Tue 
Sep 28 05:50:26 2010
@@ -30,6 +30,7 @@ import static org.junit.Assert.*;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.ApplicationState;
@@ -62,7 +63,7 @@ public class MoveTest extends CleanupHel
         ArrayList<Token> keyTokens = new ArrayList<Token>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
 
         Map<Token, List<InetAddress>> expectedEndpoints = new HashMap<Token, 
List<InetAddress>>();
         for (String table : DatabaseDescriptor.getNonSystemTables())
@@ -134,7 +135,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 10 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
RING_SIZE);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, RING_SIZE);
 
         // nodes 6, 8 and 9 leave
         final int[] LEAVING = new int[] {6, 8, 9};
@@ -421,7 +422,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
7);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 7);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(endpointTokens.get(2)));
@@ -488,7 +489,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
 
         // node 2 leaves
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(endpointTokens.get(2)));
@@ -530,7 +531,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring or 5 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
6);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
 
         // node 2 leaves with _different_ token
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.leaving(keyTokens.get(0)));
@@ -578,7 +579,7 @@ public class MoveTest extends CleanupHel
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         // create a ring of 6 nodes
-        createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, 
7);
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 7);
 
         // node hosts.get(2) goes jumps to left
         ss.onChange(hosts.get(2), ApplicationState.STATUS, 
valueFactory.left(endpointTokens.get(2)));
@@ -602,31 +603,6 @@ public class MoveTest extends CleanupHel
         ss.setPartitionerUnsafe(oldPartitioner);
     }
 
-    /**
-     * Creates initial set of nodes and tokens. Nodes are added to 
StorageService as 'normal'
-     */
-    private void createInitialRing(StorageService ss, IPartitioner 
partitioner, List<Token> endpointTokens,
-                                   List<Token> keyTokens, List<InetAddress> 
hosts, int howMany)
-        throws UnknownHostException
-    {
-        for (int i=0; i<howMany; i++)
-        {
-            endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
-            keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
-        }
-
-        for (int i=0; i<endpointTokens.size(); i++)
-        {
-            InetAddress ep = InetAddress.getByName("127.0.0." + 
String.valueOf(i + 1));
-            ss.onChange(ep, ApplicationState.STATUS, new 
VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i)));
-            hosts.add(ep);
-        }
-
-        // check that all nodes are in token metadata
-        for (int i=0; i<endpointTokens.size(); ++i)
-            assertTrue(ss.getTokenMetadata().isMember(hosts.get(i)));
-    }
-
     private static Collection<InetAddress> makeAddrs(String... hosts) throws 
UnknownHostException
     {
         ArrayList<InetAddress> addrs = new 
ArrayList<InetAddress>(hosts.length);

Added: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1002025&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java 
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Tue 
Sep 28 05:50:26 2010
@@ -0,0 +1,217 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.sink.IMessageSink;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.streaming.StreamUtil;
+import org.apache.cassandra.streaming.StreamRequestVerbHandler;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class RemoveTest extends CleanupHelper
+{
+    StorageService ss = StorageService.instance;
+    TokenMetadata tmd = ss.getTokenMetadata();
+    IPartitioner oldPartitioner;
+    ArrayList<Token> endpointTokens;
+    ArrayList<Token> keyTokens;
+    List<InetAddress> hosts;
+
+    @Before
+    public void setup() throws IOException {
+        tmd.clearUnsafe();
+        IPartitioner partitioner = new RandomPartitioner();
+
+        oldPartitioner = ss.setPartitionerUnsafe(partitioner);
+
+        endpointTokens = new ArrayList<Token>();
+        keyTokens = new ArrayList<Token>();
+        hosts = new ArrayList<InetAddress>();
+
+        // create a ring of 5 nodes
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, 
hosts, 6);
+
+        MessagingService.instance.listen(FBUtilities.getLocalAddress());
+        Gossiper.instance.start(FBUtilities.getLocalAddress(), 1);
+        for(int i = 0; i < 6; i++) {
+            Gossiper.instance.initializeNodeUnsafe(hosts.get(i), 1);
+        }
+    }
+
+    @After
+    public void tearDown() {
+        SinkManager.clearSinks();
+        MessagingService.instance.shutdown();
+        ss.setPartitionerUnsafe(oldPartitioner);
+    }
+
+    @Test(expected=UnsupportedOperationException.class)
+    public void testBadToken() {
+        final String token = 
ss.getPartitioner().getTokenFactory().toString(keyTokens.get(2));
+        ss.removeToken(token);
+
+    }
+
+    @Test(expected=UnsupportedOperationException.class)
+    public void testLocalToken() {
+        //first token should be localhost
+        final String token = 
ss.getPartitioner().getTokenFactory().toString(endpointTokens.get(0));
+        ss.removeToken(token);
+    }
+
+    @Test
+    public void testRemoveToken() throws InterruptedException
+    {
+        IPartitioner partitioner = ss.getPartitioner();
+
+        final String token = 
partitioner.getTokenFactory().toString(endpointTokens.get(5));
+        ReplicationSink rSink = new ReplicationSink();
+        SinkManager.addMessageSink(rSink);
+
+        // start removal in background and send replication confirmations
+        final AtomicBoolean success = new AtomicBoolean(false);
+        Thread remover =  new Thread() 
+        {
+            public void run() 
+            {
+                try 
+                {
+                    ss.removeToken(token);
+                }
+                catch (Exception e )
+                {
+                    System.err.println(e);
+                    e.printStackTrace();
+                    return;
+                }
+                success.set(true);
+            }
+        };
+        remover.start();
+
+        Thread.sleep(1000); // make sure removal is waiting for confirmation
+
+        assertTrue(tmd.isLeaving(hosts.get(5)));
+        assertEquals(1, tmd.getLeavingEndpoints().size());
+
+        for(InetAddress host : hosts) {
+            Message msg = new Message(host, 
StorageService.Verb.REPLICATION_FINISHED, new byte[0]);
+            MessagingService.instance.sendRR(msg, 
FBUtilities.getLocalAddress());
+        }
+
+        remover.join();
+
+        assertTrue(success.get());
+        assertTrue(tmd.getLeavingEndpoints().isEmpty());
+    }
+
+    @Test
+    public void testStartRemoving()
+    {
+        IPartitioner partitioner = ss.getPartitioner();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+
+        NotificationSink nSink = new NotificationSink();
+        ReplicationSink rSink = new ReplicationSink();
+        SinkManager.addMessageSink(nSink);
+        SinkManager.addMessageSink(rSink);
+
+        assertEquals(0, tmd.getLeavingEndpoints().size());
+
+        ss.onChange(hosts.get(1),
+                    ApplicationState.STATUS,
+                    valueFactory.removingNonlocal(endpointTokens.get(1), 
endpointTokens.get(5)));
+
+        assertEquals(1, nSink.callCount);
+        assertTrue(tmd.isLeaving(hosts.get(5)));
+        assertEquals(1, tmd.getLeavingEndpoints().size());
+    }
+
+    @Test
+    public void testFinishRemoving()
+    {
+        IPartitioner partitioner = ss.getPartitioner();
+        VersionedValue.VersionedValueFactory valueFactory = new 
VersionedValue.VersionedValueFactory(partitioner);
+
+        assertEquals(0, tmd.getLeavingEndpoints().size());
+
+        ss.onChange(hosts.get(1),
+                    ApplicationState.STATUS,
+                    valueFactory.removedNonlocal(endpointTokens.get(1), 
endpointTokens.get(5)));
+
+        assertFalse(Gossiper.instance.getLiveMembers().contains(hosts.get(5)));
+        assertFalse(tmd.isMember(hosts.get(5)));
+    }
+
+    class ReplicationSink implements IMessageSink {
+
+        public Message handleMessage(Message msg, InetAddress to) {
+            if(!msg.getVerb().equals(StorageService.Verb.STREAM_REQUEST))
+                return msg;
+
+            StreamUtil.finishStreamRequest(msg, to);
+
+            return null;
+        }
+    }
+
+    class NotificationSink implements IMessageSink {
+        public int callCount = 0;
+
+        public Message handleMessage(Message msg, InetAddress to) {
+            if(msg.getVerb().equals(StorageService.Verb.REPLICATION_FINISHED))
+            {
+                callCount++;
+                assertEquals(Stage.MISC, msg.getMessageType());
+                // simulate a response from remote server
+                Message response = msg.getReply(FBUtilities.getLocalAddress(), 
new byte[]{});
+                MessagingService.instance.sendOneWay(response, 
FBUtilities.getLocalAddress());
+                return null;
+            }
+            else
+            {
+                return msg;
+            }
+        }
+    }
+}

Added: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java?rev=1002025&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java 
(added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamUtil.java 
Tue Sep 28 05:50:26 2010
@@ -0,0 +1,54 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.net.InetAddress;
+
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+
+public class StreamUtil
+{
+
+    /**
+     * Takes an stream request message and creates an empty status response. 
Exists here because StreamRequestMessage
+     * is package protected.
+     */
+    static public void finishStreamRequest(Message msg, InetAddress to) 
+    {
+        byte[] body = msg.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+
+        try
+        {
+            StreamRequestMessage srm = 
StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn));
+            StreamInSession session = StreamInSession.get(to, srm.sessionId);
+            session.closeIfFinished();
+        }
+        catch (Exception e)
+        {
+            System.err.println(e); 
+            e.printStackTrace();
+        }
+    }
+}


Reply via email to