Author: jbellis
Date: Mon Sep  6 22:22:35 2010
New Revision: 993165

URL: http://svn.apache.org/viewvc?rev=993165&view=rev
Log:
naive replacement of String stage names w/ Stage enum.  also replaces 
MS.defaultExecutor w/ Stage.MISC
patch by jbellis; reviewed by Nate McCall for CASSANDRA-1465

Added:
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
    cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon 
Sep  6 22:22:35 2010
@@ -41,6 +41,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.util.Utf8;
 import org.apache.cassandra.avro.InvalidRequestException;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.migration.DropKeyspace;
 import org.apache.cassandra.db.migration.RenameKeyspace;
 import org.apache.cassandra.db.migration.UpdateColumnFamily;
@@ -573,7 +574,7 @@ public class CassandraServer implements 
     // InvalidRequestException. atypical failures will throw a 
RuntimeException.
     private static void applyMigrationOnStage(final Migration m) throws 
InvalidRequestException
     {
-        Future f = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+        Future f = StageManager.getStage(Stage.MIGRATION).submit(new Callable()
         {
             public Object call() throws Exception
             {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 Mon Sep  6 22:22:35 2010
@@ -67,6 +67,11 @@ public class JMXEnabledThreadPoolExecuto
         }
     }
 
+    public JMXEnabledThreadPoolExecutor(Stage stage)
+    {
+        this(stage + "_STAGE");
+    }
+
     private void unregisterMBean()
     {
         try

Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=993165&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Mon Sep 
 6 22:22:35 2010
@@ -0,0 +1,14 @@
+package org.apache.cassandra.concurrent;
+
+public enum Stage
+{
+    READ,
+    MUTATION,
+    STREAM,
+    GOSSIP,
+    RESPONSE,
+    AE_SERVICE,
+    LOADBALANCE,
+    MIGRATION,
+    MISC,
+}

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java 
Mon Sep  6 22:22:35 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.concurrent;
 
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -25,8 +26,6 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
 
@@ -38,33 +37,25 @@ import static org.apache.cassandra.confi
  */
 public class StageManager
 {
-    private static Map<String, ThreadPoolExecutor> stages = new 
HashMap<String, ThreadPoolExecutor>();
-
-    public final static String READ_STAGE = "ROW-READ-STAGE";
-    public final static String MUTATION_STAGE = "ROW-MUTATION-STAGE";
-    public final static String STREAM_STAGE = "STREAM-STAGE";
-    public final static String GOSSIP_STAGE = "GS";
-    public static final String RESPONSE_STAGE = "RESPONSE-STAGE";
-    public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
-    private static final String LOADBALANCE_STAGE = "LOAD-BALANCER-STAGE";
-    public static final String MIGRATION_STAGE = "MIGRATION-STAGE";
+    private static EnumMap<Stage, ThreadPoolExecutor> stages = new 
EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" 
threads alive for when idle
 
     static
     {
-        stages.put(MUTATION_STAGE, 
multiThreadedConfigurableStage(MUTATION_STAGE, getConcurrentWriters()));
-        stages.put(READ_STAGE, multiThreadedConfigurableStage(READ_STAGE, 
getConcurrentReaders()));        
-        stages.put(RESPONSE_STAGE, multiThreadedStage(RESPONSE_STAGE, 
Math.max(2, Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.MUTATION, 
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
+        stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, 
getConcurrentReaders()));        
+        stages.put(Stage.RESPONSE, multiThreadedStage(Stage.RESPONSE, 
Math.max(2, Runtime.getRuntime().availableProcessors())));
         // the rest are all single-threaded
-        stages.put(STREAM_STAGE, new 
JMXEnabledThreadPoolExecutor(STREAM_STAGE));
-        stages.put(GOSSIP_STAGE, new 
JMXEnabledThreadPoolExecutor("GOSSIP_STAGE"));
-        stages.put(AE_SERVICE_STAGE, new 
JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
-        stages.put(LOADBALANCE_STAGE, new 
JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
-        stages.put(MIGRATION_STAGE, new 
JMXEnabledThreadPoolExecutor(MIGRATION_STAGE));
+        stages.put(Stage.STREAM, new 
JMXEnabledThreadPoolExecutor(Stage.STREAM));
+        stages.put(Stage.GOSSIP, new 
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
+        stages.put(Stage.AE_SERVICE, new 
JMXEnabledThreadPoolExecutor(Stage.AE_SERVICE));
+        stages.put(Stage.LOADBALANCE, new 
JMXEnabledThreadPoolExecutor(Stage.LOADBALANCE));
+        stages.put(Stage.MIGRATION, new 
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
+        stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
     }
 
-    private static ThreadPoolExecutor multiThreadedStage(String name, int 
numThreads)
+    private static ThreadPoolExecutor multiThreadedStage(Stage stage, int 
numThreads)
     {
         // avoid running afoul of requirement in DebuggableThreadPoolExecutor 
that single-threaded executors
         // must have unbounded queues
@@ -75,10 +66,10 @@ public class StageManager
                                                 KEEPALIVE,
                                                 TimeUnit.SECONDS,
                                                 new 
LinkedBlockingQueue<Runnable>(),
-                                                new NamedThreadFactory(name));
+                                                new NamedThreadFactory(stage + 
"_STAGE"));
     }
     
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(String 
name, int numThreads)
+    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage 
stage, int numThreads)
     {
         assert numThreads > 1 : "multi-threaded stages must have at least 2 
threads";
         
@@ -87,16 +78,16 @@ public class StageManager
                                                      KEEPALIVE,
                                                      TimeUnit.SECONDS,
                                                      new 
LinkedBlockingQueue<Runnable>(),
-                                                     new 
NamedThreadFactory(name));
+                                                     new 
NamedThreadFactory(stage + "_STAGE"));
     }
 
     /**
      * Retrieve a stage from the StageManager
-     * @param stageName name of the stage to be retrieved.
+     * @param stage name of the stage to be retrieved.
     */
-    public static ThreadPoolExecutor getStage(String stageName)
+    public static ThreadPoolExecutor getStage(Stage stage)
     {
-        return stages.get(stageName);
+        return stages.get(stage);
     }
     
     /**
@@ -104,8 +95,7 @@ public class StageManager
      */
     public static void shutdownNow()
     {
-        Set<String> stages = StageManager.stages.keySet();
-        for (String stage : stages)
+        for (Stage stage : Stage.values())
         {
             StageManager.stages.get(stage).shutdownNow();
         }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
 Mon Sep  6 22:22:35 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -55,7 +56,7 @@ public class DefinitionsUpdateResponseVe
                 {
                     final Migration m = Migration.deserialize(col.value());
                     assert m.getVersion().equals(version);
-                    
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new WrappedRunnable()
+                    StageManager.getStage(Stage.MIGRATION).submit(new 
WrappedRunnable()
                     {
                         @Override
                         protected void runMayThrow() throws Exception

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Mon 
Sep  6 22:22:35 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.util.Arrays;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.ICompactSerializer2;
@@ -67,7 +68,7 @@ public class IndexScanCommand
             throw new IOError(e);
         }
         return new Message(FBUtilities.getLocalAddress(),
-                           StageManager.READ_STAGE,
+                           Stage.READ,
                            StorageService.Verb.INDEX_SCAN,
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon 
Sep  6 22:22:35 2010
@@ -36,6 +36,7 @@
 
 package org.apache.cassandra.db;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 
 import org.apache.cassandra.dht.AbstractBounds;
@@ -90,7 +91,7 @@ public class RangeSliceCommand
         DataOutputBuffer dob = new DataOutputBuffer();
         serializer.serialize(this, dob);
         return new Message(FBUtilities.getLocalAddress(),
-                           StageManager.READ_STAGE,
+                           Stage.READ,
                            StorageService.Verb.RANGE_SLICE,
                            Arrays.copyOf(dob.getData(), dob.getLength()));
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Sep  
6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -52,7 +53,7 @@ public abstract class ReadCommand
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         ReadCommand.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.READ_STAGE, StorageService.Verb.READ, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), Stage.READ, 
StorageService.Verb.READ, bos.toByteArray());
     }
 
     public final QueryPath queryPath;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Sep  
6 22:22:35 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionExc
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
@@ -215,7 +216,7 @@ public class RowMutation
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.MUTATION_STAGE, verb, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION, 
verb, bos.toByteArray());
     }
 
     public static RowMutation getRowMutationFromMutations(String keyspace, 
byte[] key, Map<String, List<Mutation>> cfmap)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java 
Mon Sep  6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import javax.xml.bind.annotation.XmlElement;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -51,7 +52,7 @@ public class RowMutationMessage
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         RowMutationMessage.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.MUTATION_STAGE, verb, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION, 
verb, bos.toByteArray());
     }
     
     @XmlElement(name="RowMutation")

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Sep  6 
22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -73,7 +74,7 @@ public class Truncation
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.MUTATION_STAGE, StorageService.Verb.TRUNCATE,
+        return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION, 
StorageService.Verb.TRUNCATE,
                 bos.toByteArray());
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java 
Mon Sep  6 22:22:35 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db.commitlog;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -310,7 +311,7 @@ public class CommitLog
                             }
                         }
                     };
-                    
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+                    
futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
                     if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
                     {
                         FBUtilities.waitOnFutures(futures);

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Sep 
 6 22:22:35 2010
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
  import org.apache.commons.lang.ArrayUtils;
 
+ import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.locator.AbstractReplicationStrategy;
  import org.apache.cassandra.net.*;
@@ -171,7 +172,7 @@ public class BootStrapper
 
     static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
-        Message message = new Message(FBUtilities.getLocalAddress(), "", 
StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
+        Message message = new Message(FBUtilities.getLocalAddress(), 
Stage.MISC, StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
         BootstrapTokenCallback btc = new BootstrapTokenCallback();
         MessagingService.instance.sendRR(message, maxEndpoint, btc);
         return btc.getToken();

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Sep  6 
22:22:35 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.net.InetAddress;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.Message;
@@ -300,7 +301,7 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
-        return new Message(localEndpoint_, StageManager.GOSSIP_STAGE, 
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
+        return new Message(localEndpoint_, Stage.GOSSIP, 
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
     }
 
     Message makeGossipDigestAckMessage(GossipDigestAckMessage 
gDigestAckMessage) throws IOException
@@ -310,7 +311,7 @@ public class Gossiper implements IFailur
         GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
         if (logger_.isTraceEnabled())
             logger_.trace("@@@@ Size of GossipDigestAckMessage is " + 
bos.toByteArray().length);
-        return new Message(localEndpoint_, StageManager.GOSSIP_STAGE, 
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+        return new Message(localEndpoint_, Stage.GOSSIP, 
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
     }
 
     Message makeGossipDigestAck2Message(GossipDigestAck2Message 
gDigestAck2Message) throws IOException
@@ -318,7 +319,7 @@ public class Gossiper implements IFailur
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, 
dos);
-        return new Message(localEndpoint_, StageManager.GOSSIP_STAGE, 
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
+        return new Message(localEndpoint_, Stage.GOSSIP, 
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Sep  6 
22:22:35 2010
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.net.InetAddress;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
 
@@ -44,12 +45,13 @@ public class Header
     }
 
     private InetAddress from_;
-    private String type_;
+    // TODO STAGE can be determined from verb
+    private Stage type_;
     private StorageService.Verb verb_;
     private String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
     
-    Header(String id, InetAddress from, String messageType, 
StorageService.Verb verb)
+    Header(String id, InetAddress from, Stage messageType, StorageService.Verb 
verb)
     {
         assert id != null;
         assert from != null;
@@ -62,13 +64,13 @@ public class Header
         verb_ = verb;        
     }
     
-    Header(String id, InetAddress from, String messageType, 
StorageService.Verb verb, Map<String, byte[]> details)
+    Header(String id, InetAddress from, Stage messageType, StorageService.Verb 
verb, Map<String, byte[]> details)
     {
         this(id, from, messageType, verb);
         details_ = details;
     }
 
-    Header(InetAddress from, String messageType, StorageService.Verb verb)
+    Header(InetAddress from, Stage messageType, StorageService.Verb verb)
     {
         this(Integer.toString(idGen_.incrementAndGet()), from, messageType, 
verb);
     }        
@@ -78,7 +80,7 @@ public class Header
         return from_;
     }
 
-    String getMessageType()
+    Stage getMessageType()
     {
         return type_;
     }
@@ -115,7 +117,7 @@ class HeaderSerializer implements ICompa
     {           
         dos.writeUTF(t.getMessageId());
         CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
-        dos.writeUTF(t.getMessageType());
+        dos.writeInt(t.getMessageType().ordinal());
         dos.writeInt(t.getVerb().ordinal());
         
         /* Serialize the message header */
@@ -136,7 +138,7 @@ class HeaderSerializer implements ICompa
     {
         String id = dis.readUTF();
         InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
-        String type = dis.readUTF();
+        int typeOrdinal = dis.readInt();
         int verbOrdinal = dis.readInt();
         
         /* Deserializing the message header */
@@ -151,7 +153,7 @@ class HeaderSerializer implements ICompa
             details.put(key, bytes);
         }
         
-        return new Header(id, from, type, StorageService.VERBS[verbOrdinal], 
details);
+        return new Header(id, from, Stage.values()[typeOrdinal], 
StorageService.VERBS[verbOrdinal], details);
     }
 }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Sep  6 
22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
@@ -53,7 +54,7 @@ public class Message
         body_ = body;
     }
 
-    public Message(InetAddress from, String messageType, StorageService.Verb 
verb, byte[] body)
+    public Message(InetAddress from, Stage messageType, StorageService.Verb 
verb, byte[] body)
     {
         this(new Header(from, messageType, verb), body);
     }    
@@ -78,7 +79,7 @@ public class Message
         return header_.getFrom();
     }
 
-    public String getMessageType()
+    public Stage getMessageType()
     {
         return header_.getMessageType();
     }
@@ -101,7 +102,7 @@ public class Message
     // TODO should take byte[] + length so we don't have to copy to a byte[] 
of exactly the right len
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE);
+        Header header = new Header(getMessageId(), from, Stage.RESPONSE, 
StorageService.Verb.READ_RESPONSE);
         return new Message(header, args);
     }
     

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon 
Sep  6 22:22:35 2010
@@ -68,9 +68,6 @@ public class MessagingService
     /* Lookup table for registering message handlers based on the verb. */
     private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
-    /* Thread pool to handle messages without a specialized stage */
-    private static ExecutorService defaultExecutor_;
-    
     /* Thread pool to handle messaging write activities */
     private static ExecutorService streamExecutor_;
     
@@ -104,8 +101,6 @@ public class MessagingService
         callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 * 
DatabaseDescriptor.getRpcTimeout()));
         taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1 
* DatabaseDescriptor.getRpcTimeout()));
 
-        defaultExecutor_ = new 
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
-
         streamExecutor_ = new 
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
         TimerTask logDropped = new TimerTask()
         {
@@ -344,8 +339,6 @@ public class MessagingService
     /** blocks until the processing pools are empty and done. */
     public static void waitFor() throws InterruptedException
     {
-        while (!defaultExecutor_.isTerminated())
-            defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
         while (!streamExecutor_.isTerminated())
             streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
     }
@@ -363,7 +356,6 @@ public class MessagingService
             throw new IOError(e);
         }
 
-        defaultExecutor_.shutdownNow();
         streamExecutor_.shutdownNow();
 
         /* shut down the cachetables */
@@ -379,17 +371,8 @@ public class MessagingService
 
         Runnable runnable = new MessageDeliveryTask(message);
         ExecutorService stage = 
StageManager.getStage(message.getMessageType());
-
-        if (stage == null)
-        {
-            if (logger_.isDebugEnabled())
-                logger_.debug("Running " + message.getMessageType() + " on 
default stage");
-            defaultExecutor_.execute(runnable);
-        }
-        else
-        {
-            stage.execute(runnable);
-        }
+        assert stage != null;
+        stage.execute(runnable);
     }
 
     public static IAsyncCallback getRegisteredCallback(String key)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
 Mon Sep  6 22:22:35 2010
@@ -20,7 +20,7 @@ package org.apache.cassandra.net;
 
 import java.net.InetAddress;
 
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.Stage;
 
 class OutboundTcpConnectionPool
 {
@@ -39,9 +39,8 @@ class OutboundTcpConnectionPool
      */
     OutboundTcpConnection getConnection(Message msg)
     {
-        return msg.getMessageType().equals(StageManager.RESPONSE_STAGE) || 
msg.getMessageType().equals(StageManager.GOSSIP_STAGE)
-               ? ackCon
-               : cmdCon;
+        Stage stage = msg.getMessageType();
+        return stage == Stage.RESPONSE || stage == Stage.GOSSIP ? ackCon : 
cmdCon;
     }
 
     synchronized void reset()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java 
Mon Sep  6 22:22:35 2010
@@ -29,6 +29,7 @@ import com.google.common.base.Objects;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
@@ -60,7 +61,7 @@ import org.apache.cassandra.utils.*;
  * Once the trees rendezvous, a Differencer is executed and the service can 
trigger repairs
  * for disagreeing ranges.
  *
- * Tree comparison and repair triggering occur in the single threaded 
AE_SERVICE_STAGE.
+ * Tree comparison and repair triggering occur in the single threaded 
Stage.AE_SERVICE.
  *
  * The steps taken to enact a repair are as follows:
  * 1. A major compaction is triggered via nodeprobe:
@@ -79,7 +80,7 @@ import org.apache.cassandra.utils.*;
  *   * If the tree is remote, it is immediately compared to a local tree if 
one is cached. Otherwise,
  *     the remote tree is stored until a local tree can be generated.
  *   * A Differencer object is enqueued for each comparison.
- * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees, 
and perform repair via the
+ * 4. Differencers are executed in Stage.AE_SERVICE, to compare the two trees, 
and perform repair via the
  *    streaming api.
  */
 public class AntiEntropyService
@@ -96,7 +97,7 @@ public class AntiEntropyService
      * Map of outstanding sessions to requests. Once both trees reach the 
rendezvous, the local node
      * will queue a Differencer to compare them.
      *
-     * This map is only accessed from AE_SERVICE_STAGE, so it is not 
synchronized.
+     * This map is only accessed from Stage.AE_SERVICE, so it is not 
synchronized.
      */
     private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
 
@@ -138,7 +139,7 @@ public class AntiEntropyService
 
     /**
      * Returns the map of waiting rendezvous endpoints to trees for the given 
session.
-     * Should only be called within AE_SERVICE_STAGE.
+     * Should only be called within Stage.AE_SERVICE.
      */
     private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
     {
@@ -169,7 +170,7 @@ public class AntiEntropyService
     }
 
     /**
-     * Register a tree for the given request to be compared to the appropriate 
trees in AE_SERVICE_STAGE when they become available.
+     * Register a tree for the given request to be compared to the appropriate 
trees in Stage.AE_SERVICE when they become available.
      */
     private void rendezvous(TreeRequest request, MerkleTree tree)
     {
@@ -219,7 +220,7 @@ public class AntiEntropyService
         for (Differencer differencer : differencers)
         {
             logger.info("Queueing comparison " + differencer);
-            
StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(differencer);
+            StageManager.getStage(Stage.AE_SERVICE).execute(differencer);
         }
     }
 
@@ -400,7 +401,7 @@ public class AntiEntropyService
         }
 
         /**
-         * Registers the newly created tree for rendezvous in AE_SERVICE_STAGE.
+         * Registers the newly created tree for rendezvous in Stage.AE_SERVICE.
          */
         public void complete()
         {
@@ -418,12 +419,12 @@ public class AntiEntropyService
                 for (MerkleTree.RowHash minrow : minrows)
                     range.addHash(minrow);
 
-            StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
+            StageManager.getStage(Stage.AE_SERVICE).submit(this);
             logger.debug("Validated " + validated + " rows into AEService tree 
for " + request);
         }
         
         /**
-         * Called after the validation lifecycle to respond with the now valid 
tree. Runs in AE_SERVICE_STAGE.
+         * Called after the validation lifecycle to respond with the now valid 
tree. Runs in Stage.AE_SERVICE.
          *
          * @return A meaningless object.
          */
@@ -533,7 +534,7 @@ public class AntiEntropyService
                 final List<Range> ranges = new ArrayList<Range>(differences);
                 final Collection<SSTableReader> sstables = 
cfstore.getSSTables();
                 // send ranges to the remote node
-                Future f = 
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
+                Future f = StageManager.getStage(Stage.STREAM).submit(new 
WrappedRunnable()
                 {
                     protected void runMayThrow() throws Exception
                     {
@@ -578,7 +579,7 @@ public class AntiEntropyService
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(request, dos);
                 return new Message(FBUtilities.getLocalAddress(),
-                                   StageManager.AE_SERVICE_STAGE,
+                                   Stage.AE_SERVICE,
                                    StorageService.Verb.TREE_REQUEST,
                                    bos.toByteArray());
             }
@@ -643,7 +644,7 @@ public class AntiEntropyService
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
                 SERIALIZER.serialize(validator, dos);
-                return new Message(local, StageManager.AE_SERVICE_STAGE, 
StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
+                return new Message(local, Stage.AE_SERVICE, 
StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
             }
             catch(IOException e)
             {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java 
Mon Sep  6 22:22:35 2010
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -164,7 +165,7 @@ class ConsistencyChecker implements Runn
             ReadResponse.serializer().serialize(readResponse, out);
             byte[] bytes = new byte[out.getLength()];
             System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            responses_.add(new Message(FBUtilities.getLocalAddress(), 
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+            responses_.add(new Message(FBUtilities.getLocalAddress(), 
Stage.RESPONSE, StorageService.Verb.READ_RESPONSE, bytes));
         }
 
         // synchronized so the " == majority" is safe

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java 
Mon Sep  6 22:22:35 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -120,7 +121,7 @@ public class MigrationManager implements
         for (IColumn col : migrations)
         {
             final Migration migration = Migration.deserialize(col.value());
-            Future update = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Runnable() 
+            Future update = StageManager.getStage(Stage.MIGRATION).submit(new 
Runnable()
             {
                 public void run()
                 {
@@ -179,7 +180,7 @@ public class MigrationManager implements
     private static Message makeVersionMessage(UUID version)
     {
         byte[] body = version.toString().getBytes();
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.READ_STAGE, StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+        return new Message(FBUtilities.getLocalAddress(), Stage.READ, 
StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
     }
     
     // other half of transformation is in DefinitionsUpdateResponseVerbHandler.
@@ -198,7 +199,7 @@ public class MigrationManager implements
         }
         dout.close();
         byte[] body = bout.toByteArray();
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.MUTATION_STAGE, StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, 
body);
+        return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION, 
StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
     }
     
     // other half of this transformation is in MigrationManager.

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon 
Sep  6 22:22:35 2010
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -197,7 +198,7 @@ public class StorageProxy implements Sto
                 responseHandler.response(null);
             }
         };
-        StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
+        StageManager.getStage(Stage.MUTATION).execute(runnable);
     }
 
     /**
@@ -244,7 +245,7 @@ public class StorageProxy implements Sto
                 if (localFutures == null)
                     localFutures = new ArrayList<Future<Object>>();
                 Callable<Object> callable = new weakReadLocalCallable(command);
-                
localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
+                
localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
             }
             else
             {
@@ -493,7 +494,7 @@ public class StorageProxy implements Sto
         final String myVersion = 
DatabaseDescriptor.getDefsVersion().toString();
         final Map<InetAddress, UUID> versions = new 
ConcurrentHashMap<InetAddress, UUID>();
         final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
-        final Message msg = new Message(FBUtilities.getLocalAddress(), 
StageManager.MIGRATION_STAGE, StorageService.Verb.SCHEMA_CHECK, 
ArrayUtils.EMPTY_BYTE_ARRAY);
+        final Message msg = new Message(FBUtilities.getLocalAddress(), 
Stage.MIGRATION, StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         MessagingService.instance.sendRR(msg, liveHosts.toArray(new 
InetAddress[]{}), new IAsyncCallback() 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Mon Sep  6 22:22:35 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.RawColumnDefinition;
@@ -1485,7 +1486,7 @@ public class StorageService implements I
                             latch.countDown();
                     }
                 };
-                StageManager.getStage(StageManager.STREAM_STAGE).execute(new 
Runnable()
+                StageManager.getStage(Stage.STREAM).execute(new Runnable()
                 {
                     public void run()
                     {
@@ -1632,7 +1633,7 @@ public class StorageService implements I
     /** shuts node off to writes, empties memtables and the commit log. */
     public synchronized void drain() throws IOException, InterruptedException, 
ExecutionException
     {
-        ExecutorService mutationStage = 
StageManager.getStage(StageManager.MUTATION_STAGE);
+        ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated())
         {
             logger_.warn("Cannot drain node (did it already happen?)");
@@ -1711,7 +1712,7 @@ public class StorageService implements I
         Migration migration = null;
         try
         {
-            migration = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(call).get();
+            migration = 
StageManager.getStage(Stage.MIGRATION).submit(call).get();
         }
         catch (InterruptedException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Mon 
Sep  6 22:22:35 2010
@@ -26,6 +26,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
@@ -94,7 +95,7 @@ class FileStatus
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream( bos );
         FileStatus.serializer().serialize(this, dos);
-        return new Message(FBUtilities.getLocalAddress(), "", 
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
+        return new Message(FBUtilities.getLocalAddress(), Stage.MISC, 
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
     }
 
     private static class FileStatusSerializer implements 
ICompactSerializer<FileStatus>

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
 Mon Sep  6 22:22:35 2010
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
@@ -96,7 +97,7 @@ class StreamRequestMessage
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), 
StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST, 
bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), Stage.STREAM, 
StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
     }
 
     public String toString()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Mon Sep  6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.db.migration.UpdateColumnFamily;
 import org.apache.cassandra.db.migration.UpdateKeyspace;
@@ -636,7 +637,7 @@ public class CassandraServer implements 
     // InvalidRequestException. atypical failures will throw a 
RuntimeException.
     private static void applyMigrationOnStage(final Migration m) throws 
InvalidRequestException
     {
-        Future f = 
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+        Future f = StageManager.getStage(Stage.MIGRATION).submit(new Callable()
         {
             public Object call() throws Exception
             {

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
 Mon Sep  6 22:22:35 2010
@@ -31,6 +31,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
@@ -240,7 +241,7 @@ public class AntiEntropyServiceTest exte
 
     void flushAES() throws Exception
     {
-        final ThreadPoolExecutor stage = 
StageManager.getStage(StageManager.AE_SERVICE_STAGE);
+        final ThreadPoolExecutor stage = 
StageManager.getStage(Stage.AE_SERVICE);
         final Callable noop = new Callable<Object>()
         {
             public Boolean call()


Reply via email to