Author: jbellis
Date: Mon Sep 6 22:22:35 2010
New Revision: 993165
URL: http://svn.apache.org/viewvc?rev=993165&view=rev
Log:
naive replacement of String stage names w/ Stage enum. also replaces
MS.defaultExecutor w/ Stage.MISC
patch by jbellis; reviewed by Nate McCall for CASSANDRA-1465
Added:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon
Sep 6 22:22:35 2010
@@ -41,6 +41,7 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.util.Utf8;
import org.apache.cassandra.avro.InvalidRequestException;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.migration.DropKeyspace;
import org.apache.cassandra.db.migration.RenameKeyspace;
import org.apache.cassandra.db.migration.UpdateColumnFamily;
@@ -573,7 +574,7 @@ public class CassandraServer implements
// InvalidRequestException. atypical failures will throw a
RuntimeException.
private static void applyMigrationOnStage(final Migration m) throws
InvalidRequestException
{
- Future f =
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+ Future f = StageManager.getStage(Stage.MIGRATION).submit(new Callable()
{
public Object call() throws Exception
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
Mon Sep 6 22:22:35 2010
@@ -67,6 +67,11 @@ public class JMXEnabledThreadPoolExecuto
}
}
+ public JMXEnabledThreadPoolExecutor(Stage stage)
+ {
+ this(stage + "_STAGE");
+ }
+
private void unregisterMBean()
{
try
Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=993165&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Mon Sep
6 22:22:35 2010
@@ -0,0 +1,14 @@
+package org.apache.cassandra.concurrent;
+
+public enum Stage
+{
+ READ,
+ MUTATION,
+ STREAM,
+ GOSSIP,
+ RESPONSE,
+ AE_SERVICE,
+ LOADBALANCE,
+ MIGRATION,
+ MISC,
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
Mon Sep 6 22:22:35 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.concurrent;
+import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -25,8 +26,6 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentReaders;
import static
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentWriters;
@@ -38,33 +37,25 @@ import static org.apache.cassandra.confi
*/
public class StageManager
{
- private static Map<String, ThreadPoolExecutor> stages = new
HashMap<String, ThreadPoolExecutor>();
-
- public final static String READ_STAGE = "ROW-READ-STAGE";
- public final static String MUTATION_STAGE = "ROW-MUTATION-STAGE";
- public final static String STREAM_STAGE = "STREAM-STAGE";
- public final static String GOSSIP_STAGE = "GS";
- public static final String RESPONSE_STAGE = "RESPONSE-STAGE";
- public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
- private static final String LOADBALANCE_STAGE = "LOAD-BALANCER-STAGE";
- public static final String MIGRATION_STAGE = "MIGRATION-STAGE";
+ private static EnumMap<Stage, ThreadPoolExecutor> stages = new
EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
public static final long KEEPALIVE = 60; // seconds to keep "extra"
threads alive for when idle
static
{
- stages.put(MUTATION_STAGE,
multiThreadedConfigurableStage(MUTATION_STAGE, getConcurrentWriters()));
- stages.put(READ_STAGE, multiThreadedConfigurableStage(READ_STAGE,
getConcurrentReaders()));
- stages.put(RESPONSE_STAGE, multiThreadedStage(RESPONSE_STAGE,
Math.max(2, Runtime.getRuntime().availableProcessors())));
+ stages.put(Stage.MUTATION,
multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
+ stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ,
getConcurrentReaders()));
+ stages.put(Stage.RESPONSE, multiThreadedStage(Stage.RESPONSE,
Math.max(2, Runtime.getRuntime().availableProcessors())));
// the rest are all single-threaded
- stages.put(STREAM_STAGE, new
JMXEnabledThreadPoolExecutor(STREAM_STAGE));
- stages.put(GOSSIP_STAGE, new
JMXEnabledThreadPoolExecutor("GOSSIP_STAGE"));
- stages.put(AE_SERVICE_STAGE, new
JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
- stages.put(LOADBALANCE_STAGE, new
JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
- stages.put(MIGRATION_STAGE, new
JMXEnabledThreadPoolExecutor(MIGRATION_STAGE));
+ stages.put(Stage.STREAM, new
JMXEnabledThreadPoolExecutor(Stage.STREAM));
+ stages.put(Stage.GOSSIP, new
JMXEnabledThreadPoolExecutor(Stage.GOSSIP));
+ stages.put(Stage.AE_SERVICE, new
JMXEnabledThreadPoolExecutor(Stage.AE_SERVICE));
+ stages.put(Stage.LOADBALANCE, new
JMXEnabledThreadPoolExecutor(Stage.LOADBALANCE));
+ stages.put(Stage.MIGRATION, new
JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
+ stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
}
- private static ThreadPoolExecutor multiThreadedStage(String name, int
numThreads)
+ private static ThreadPoolExecutor multiThreadedStage(Stage stage, int
numThreads)
{
// avoid running afoul of requirement in DebuggableThreadPoolExecutor
that single-threaded executors
// must have unbounded queues
@@ -75,10 +66,10 @@ public class StageManager
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory(name));
+ new NamedThreadFactory(stage +
"_STAGE"));
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(String
name, int numThreads)
+ private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads)
{
assert numThreads > 1 : "multi-threaded stages must have at least 2
threads";
@@ -87,16 +78,16 @@ public class StageManager
KEEPALIVE,
TimeUnit.SECONDS,
new
LinkedBlockingQueue<Runnable>(),
- new
NamedThreadFactory(name));
+ new
NamedThreadFactory(stage + "_STAGE"));
}
/**
* Retrieve a stage from the StageManager
- * @param stageName name of the stage to be retrieved.
+ * @param stage name of the stage to be retrieved.
*/
- public static ThreadPoolExecutor getStage(String stageName)
+ public static ThreadPoolExecutor getStage(Stage stage)
{
- return stages.get(stageName);
+ return stages.get(stage);
}
/**
@@ -104,8 +95,7 @@ public class StageManager
*/
public static void shutdownNow()
{
- Set<String> stages = StageManager.stages.keySet();
- for (String stage : stages)
+ for (Stage stage : Stage.values())
{
StageManager.stages.get(stage).shutdownNow();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Mon Sep 6 22:22:35 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -55,7 +56,7 @@ public class DefinitionsUpdateResponseVe
{
final Migration m = Migration.deserialize(col.value());
assert m.getVersion().equals(version);
-
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new WrappedRunnable()
+ StageManager.getStage(Stage.MIGRATION).submit(new
WrappedRunnable()
{
@Override
protected void runMayThrow() throws Exception
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IndexScanCommand.java Mon
Sep 6 22:22:35 2010
@@ -22,6 +22,7 @@ package org.apache.cassandra.db;
import java.io.*;
import java.util.Arrays;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.io.ICompactSerializer2;
@@ -67,7 +68,7 @@ public class IndexScanCommand
throw new IOError(e);
}
return new Message(FBUtilities.getLocalAddress(),
- StageManager.READ_STAGE,
+ Stage.READ,
StorageService.Verb.INDEX_SCAN,
Arrays.copyOf(dob.getData(), dob.getLength()));
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Mon
Sep 6 22:22:35 2010
@@ -36,6 +36,7 @@
package org.apache.cassandra.db;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.AbstractBounds;
@@ -90,7 +91,7 @@ public class RangeSliceCommand
DataOutputBuffer dob = new DataOutputBuffer();
serializer.serialize(this, dob);
return new Message(FBUtilities.getLocalAddress(),
- StageManager.READ_STAGE,
+ Stage.READ,
StorageService.Verb.RANGE_SLICE,
Arrays.copyOf(dob.getData(), dob.getLength()));
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Mon Sep
6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
@@ -52,7 +53,7 @@ public abstract class ReadCommand
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
ReadCommand.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(),
StageManager.READ_STAGE, StorageService.Verb.READ, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), Stage.READ,
StorageService.Verb.READ, bos.toByteArray());
}
public final QueryPath queryPath;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Mon Sep
6 22:22:35 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutionExc
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
@@ -215,7 +216,7 @@ public class RowMutation
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(),
StageManager.MUTATION_STAGE, verb, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
verb, bos.toByteArray());
}
public static RowMutation getRowMutationFromMutations(String keyspace,
byte[] key, Map<String, List<Mutation>> cfmap)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationMessage.java
Mon Sep 6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.io.IOException;
import javax.xml.bind.annotation.XmlElement;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
@@ -51,7 +52,7 @@ public class RowMutationMessage
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
RowMutationMessage.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(),
StageManager.MUTATION_STAGE, verb, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
verb, bos.toByteArray());
}
@XmlElement(name="RowMutation")
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Truncation.java Mon Sep 6
22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -73,7 +74,7 @@ public class Truncation
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(),
StageManager.MUTATION_STAGE, StorageService.Verb.TRUNCATE,
+ return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
StorageService.Verb.TRUNCATE,
bos.toByteArray());
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Mon Sep 6 22:22:35 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.commitlog;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -310,7 +311,7 @@ public class CommitLog
}
}
};
-
futures.add(StageManager.getStage(StageManager.MUTATION_STAGE).submit(runnable));
+
futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT)
{
FBUtilities.waitOnFutures(futures);
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Sep
6 22:22:35 2010
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang.ArrayUtils;
+ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
@@ -171,7 +172,7 @@ public class BootStrapper
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
- Message message = new Message(FBUtilities.getLocalAddress(), "",
StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
+ Message message = new Message(FBUtilities.getLocalAddress(),
Stage.MISC, StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
BootstrapTokenCallback btc = new BootstrapTokenCallback();
MessagingService.instance.sendRR(message, maxEndpoint, btc);
return btc.getToken();
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Sep 6
22:22:35 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.CopyOnWriteArrayList;
import java.net.InetAddress;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.net.Message;
@@ -300,7 +301,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- return new Message(localEndpoint_, StageManager.GOSSIP_STAGE,
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
+ return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_SYN, bos.toByteArray());
}
Message makeGossipDigestAckMessage(GossipDigestAckMessage
gDigestAckMessage) throws IOException
@@ -310,7 +311,7 @@ public class Gossiper implements IFailur
GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
if (logger_.isTraceEnabled())
logger_.trace("@@@@ Size of GossipDigestAckMessage is " +
bos.toByteArray().length);
- return new Message(localEndpoint_, StageManager.GOSSIP_STAGE,
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
+ return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_ACK, bos.toByteArray());
}
Message makeGossipDigestAck2Message(GossipDigestAck2Message
gDigestAck2Message) throws IOException
@@ -318,7 +319,7 @@ public class Gossiper implements IFailur
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
GossipDigestAck2Message.serializer().serialize(gDigestAck2Message,
dos);
- return new Message(localEndpoint_, StageManager.GOSSIP_STAGE,
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
+ return new Message(localEndpoint_, Stage.GOSSIP,
StorageService.Verb.GOSSIP_DIGEST_ACK2, bos.toByteArray());
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Header.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Header.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Header.java Mon Sep 6
22:22:35 2010
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.net.InetAddress;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.service.StorageService;
@@ -44,12 +45,13 @@ public class Header
}
private InetAddress from_;
- private String type_;
+ // TODO STAGE can be determined from verb
+ private Stage type_;
private StorageService.Verb verb_;
private String messageId_;
protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
- Header(String id, InetAddress from, String messageType,
StorageService.Verb verb)
+ Header(String id, InetAddress from, Stage messageType, StorageService.Verb
verb)
{
assert id != null;
assert from != null;
@@ -62,13 +64,13 @@ public class Header
verb_ = verb;
}
- Header(String id, InetAddress from, String messageType,
StorageService.Verb verb, Map<String, byte[]> details)
+ Header(String id, InetAddress from, Stage messageType, StorageService.Verb
verb, Map<String, byte[]> details)
{
this(id, from, messageType, verb);
details_ = details;
}
- Header(InetAddress from, String messageType, StorageService.Verb verb)
+ Header(InetAddress from, Stage messageType, StorageService.Verb verb)
{
this(Integer.toString(idGen_.incrementAndGet()), from, messageType,
verb);
}
@@ -78,7 +80,7 @@ public class Header
return from_;
}
- String getMessageType()
+ Stage getMessageType()
{
return type_;
}
@@ -115,7 +117,7 @@ class HeaderSerializer implements ICompa
{
dos.writeUTF(t.getMessageId());
CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
- dos.writeUTF(t.getMessageType());
+ dos.writeInt(t.getMessageType().ordinal());
dos.writeInt(t.getVerb().ordinal());
/* Serialize the message header */
@@ -136,7 +138,7 @@ class HeaderSerializer implements ICompa
{
String id = dis.readUTF();
InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
- String type = dis.readUTF();
+ int typeOrdinal = dis.readInt();
int verbOrdinal = dis.readInt();
/* Deserializing the message header */
@@ -151,7 +153,7 @@ class HeaderSerializer implements ICompa
details.put(key, bytes);
}
- return new Header(id, from, type, StorageService.VERBS[verbOrdinal],
details);
+ return new Header(id, from, Stage.values()[typeOrdinal],
StorageService.VERBS[verbOrdinal], details);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/Message.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/Message.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/Message.java Mon Sep 6
22:22:35 2010
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.service.StorageService;
@@ -53,7 +54,7 @@ public class Message
body_ = body;
}
- public Message(InetAddress from, String messageType, StorageService.Verb
verb, byte[] body)
+ public Message(InetAddress from, Stage messageType, StorageService.Verb
verb, byte[] body)
{
this(new Header(from, messageType, verb), body);
}
@@ -78,7 +79,7 @@ public class Message
return header_.getFrom();
}
- public String getMessageType()
+ public Stage getMessageType()
{
return header_.getMessageType();
}
@@ -101,7 +102,7 @@ public class Message
// TODO should take byte[] + length so we don't have to copy to a byte[]
of exactly the right len
public Message getReply(InetAddress from, byte[] args)
{
- Header header = new Header(getMessageId(), from,
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE);
+ Header header = new Header(getMessageId(), from, Stage.RESPONSE,
StorageService.Verb.READ_RESPONSE);
return new Message(header, args);
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Mon
Sep 6 22:22:35 2010
@@ -68,9 +68,6 @@ public class MessagingService
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
- /* Thread pool to handle messages without a specialized stage */
- private static ExecutorService defaultExecutor_;
-
/* Thread pool to handle messaging write activities */
private static ExecutorService streamExecutor_;
@@ -104,8 +101,6 @@ public class MessagingService
callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()));
taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1
* DatabaseDescriptor.getRpcTimeout()));
- defaultExecutor_ = new
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
-
streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
TimerTask logDropped = new TimerTask()
{
@@ -344,8 +339,6 @@ public class MessagingService
/** blocks until the processing pools are empty and done. */
public static void waitFor() throws InterruptedException
{
- while (!defaultExecutor_.isTerminated())
- defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
while (!streamExecutor_.isTerminated())
streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
}
@@ -363,7 +356,6 @@ public class MessagingService
throw new IOError(e);
}
- defaultExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
/* shut down the cachetables */
@@ -379,17 +371,8 @@ public class MessagingService
Runnable runnable = new MessageDeliveryTask(message);
ExecutorService stage =
StageManager.getStage(message.getMessageType());
-
- if (stage == null)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Running " + message.getMessageType() + " on
default stage");
- defaultExecutor_.execute(runnable);
- }
- else
- {
- stage.execute(runnable);
- }
+ assert stage != null;
+ stage.execute(runnable);
}
public static IAsyncCallback getRegisteredCallback(String key)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Mon Sep 6 22:22:35 2010
@@ -20,7 +20,7 @@ package org.apache.cassandra.net;
import java.net.InetAddress;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.Stage;
class OutboundTcpConnectionPool
{
@@ -39,9 +39,8 @@ class OutboundTcpConnectionPool
*/
OutboundTcpConnection getConnection(Message msg)
{
- return msg.getMessageType().equals(StageManager.RESPONSE_STAGE) ||
msg.getMessageType().equals(StageManager.GOSSIP_STAGE)
- ? ackCon
- : cmdCon;
+ Stage stage = msg.getMessageType();
+ return stage == Stage.RESPONSE || stage == Stage.GOSSIP ? ackCon :
cmdCon;
}
synchronized void reset()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
Mon Sep 6 22:22:35 2010
@@ -29,6 +29,7 @@ import com.google.common.base.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
@@ -60,7 +61,7 @@ import org.apache.cassandra.utils.*;
* Once the trees rendezvous, a Differencer is executed and the service can
trigger repairs
* for disagreeing ranges.
*
- * Tree comparison and repair triggering occur in the single threaded
AE_SERVICE_STAGE.
+ * Tree comparison and repair triggering occur in the single threaded
Stage.AE_SERVICE.
*
* The steps taken to enact a repair are as follows:
* 1. A major compaction is triggered via nodeprobe:
@@ -79,7 +80,7 @@ import org.apache.cassandra.utils.*;
* * If the tree is remote, it is immediately compared to a local tree if
one is cached. Otherwise,
* the remote tree is stored until a local tree can be generated.
* * A Differencer object is enqueued for each comparison.
- * 4. Differencers are executed in AE_SERVICE_STAGE, to compare the two trees,
and perform repair via the
+ * 4. Differencers are executed in Stage.AE_SERVICE, to compare the two trees,
and perform repair via the
* streaming api.
*/
public class AntiEntropyService
@@ -96,7 +97,7 @@ public class AntiEntropyService
* Map of outstanding sessions to requests. Once both trees reach the
rendezvous, the local node
* will queue a Differencer to compare them.
*
- * This map is only accessed from AE_SERVICE_STAGE, so it is not
synchronized.
+ * This map is only accessed from Stage.AE_SERVICE, so it is not
synchronized.
*/
private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
@@ -138,7 +139,7 @@ public class AntiEntropyService
/**
* Returns the map of waiting rendezvous endpoints to trees for the given
session.
- * Should only be called within AE_SERVICE_STAGE.
+ * Should only be called within Stage.AE_SERVICE.
*/
private Map<TreeRequest, TreePair> rendezvousPairs(String sessionid)
{
@@ -169,7 +170,7 @@ public class AntiEntropyService
}
/**
- * Register a tree for the given request to be compared to the appropriate
trees in AE_SERVICE_STAGE when they become available.
+ * Register a tree for the given request to be compared to the appropriate
trees in Stage.AE_SERVICE when they become available.
*/
private void rendezvous(TreeRequest request, MerkleTree tree)
{
@@ -219,7 +220,7 @@ public class AntiEntropyService
for (Differencer differencer : differencers)
{
logger.info("Queueing comparison " + differencer);
-
StageManager.getStage(StageManager.AE_SERVICE_STAGE).execute(differencer);
+ StageManager.getStage(Stage.AE_SERVICE).execute(differencer);
}
}
@@ -400,7 +401,7 @@ public class AntiEntropyService
}
/**
- * Registers the newly created tree for rendezvous in AE_SERVICE_STAGE.
+ * Registers the newly created tree for rendezvous in Stage.AE_SERVICE.
*/
public void complete()
{
@@ -418,12 +419,12 @@ public class AntiEntropyService
for (MerkleTree.RowHash minrow : minrows)
range.addHash(minrow);
- StageManager.getStage(StageManager.AE_SERVICE_STAGE).submit(this);
+ StageManager.getStage(Stage.AE_SERVICE).submit(this);
logger.debug("Validated " + validated + " rows into AEService tree
for " + request);
}
/**
- * Called after the validation lifecycle to respond with the now valid
tree. Runs in AE_SERVICE_STAGE.
+ * Called after the validation lifecycle to respond with the now valid
tree. Runs in Stage.AE_SERVICE.
*
* @return A meaningless object.
*/
@@ -533,7 +534,7 @@ public class AntiEntropyService
final List<Range> ranges = new ArrayList<Range>(differences);
final Collection<SSTableReader> sstables =
cfstore.getSSTables();
// send ranges to the remote node
- Future f =
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()
+ Future f = StageManager.getStage(Stage.STREAM).submit(new
WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
@@ -578,7 +579,7 @@ public class AntiEntropyService
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(request, dos);
return new Message(FBUtilities.getLocalAddress(),
- StageManager.AE_SERVICE_STAGE,
+ Stage.AE_SERVICE,
StorageService.Verb.TREE_REQUEST,
bos.toByteArray());
}
@@ -643,7 +644,7 @@ public class AntiEntropyService
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
SERIALIZER.serialize(validator, dos);
- return new Message(local, StageManager.AE_SERVICE_STAGE,
StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
+ return new Message(local, Stage.AE_SERVICE,
StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
}
catch(IOException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Mon Sep 6 22:22:35 2010
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
@@ -164,7 +165,7 @@ class ConsistencyChecker implements Runn
ReadResponse.serializer().serialize(readResponse, out);
byte[] bytes = new byte[out.getLength()];
System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
- responses_.add(new Message(FBUtilities.getLocalAddress(),
StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE, bytes));
+ responses_.add(new Message(FBUtilities.getLocalAddress(),
Stage.RESPONSE, StorageService.Verb.READ_RESPONSE, bytes));
}
// synchronized so the " == majority" is safe
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Mon Sep 6 22:22:35 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.service;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -120,7 +121,7 @@ public class MigrationManager implements
for (IColumn col : migrations)
{
final Migration migration = Migration.deserialize(col.value());
- Future update =
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Runnable()
+ Future update = StageManager.getStage(Stage.MIGRATION).submit(new
Runnable()
{
public void run()
{
@@ -179,7 +180,7 @@ public class MigrationManager implements
private static Message makeVersionMessage(UUID version)
{
byte[] body = version.toString().getBytes();
- return new Message(FBUtilities.getLocalAddress(),
StageManager.READ_STAGE, StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
+ return new Message(FBUtilities.getLocalAddress(), Stage.READ,
StorageService.Verb.DEFINITIONS_ANNOUNCE, body);
}
// other half of transformation is in DefinitionsUpdateResponseVerbHandler.
@@ -198,7 +199,7 @@ public class MigrationManager implements
}
dout.close();
byte[] body = bout.toByteArray();
- return new Message(FBUtilities.getLocalAddress(),
StageManager.MUTATION_STAGE, StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE,
body);
+ return new Message(FBUtilities.getLocalAddress(), Stage.MUTATION,
StorageService.Verb.DEFINITIONS_UPDATE_RESPONSE, body);
}
// other half of this transformation is in MigrationManager.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon
Sep 6 22:22:35 2010
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -197,7 +198,7 @@ public class StorageProxy implements Sto
responseHandler.response(null);
}
};
- StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
+ StageManager.getStage(Stage.MUTATION).execute(runnable);
}
/**
@@ -244,7 +245,7 @@ public class StorageProxy implements Sto
if (localFutures == null)
localFutures = new ArrayList<Future<Object>>();
Callable<Object> callable = new weakReadLocalCallable(command);
-
localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
+
localFutures.add(StageManager.getStage(Stage.READ).submit(callable));
}
else
{
@@ -493,7 +494,7 @@ public class StorageProxy implements Sto
final String myVersion =
DatabaseDescriptor.getDefsVersion().toString();
final Map<InetAddress, UUID> versions = new
ConcurrentHashMap<InetAddress, UUID>();
final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
- final Message msg = new Message(FBUtilities.getLocalAddress(),
StageManager.MIGRATION_STAGE, StorageService.Verb.SCHEMA_CHECK,
ArrayUtils.EMPTY_BYTE_ARRAY);
+ final Message msg = new Message(FBUtilities.getLocalAddress(),
Stage.MIGRATION, StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
final CountDownLatch latch = new CountDownLatch(liveHosts.size());
// an empty message acts as a request to the SchemaCheckVerbHandler.
MessagingService.instance.sendRR(msg, liveHosts.toArray(new
InetAddress[]{}), new IAsyncCallback()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Mon Sep 6 22:22:35 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.*;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.RawColumnDefinition;
@@ -1485,7 +1486,7 @@ public class StorageService implements I
latch.countDown();
}
};
- StageManager.getStage(StageManager.STREAM_STAGE).execute(new
Runnable()
+ StageManager.getStage(Stage.STREAM).execute(new Runnable()
{
public void run()
{
@@ -1632,7 +1633,7 @@ public class StorageService implements I
/** shuts node off to writes, empties memtables and the commit log. */
public synchronized void drain() throws IOException, InterruptedException,
ExecutionException
{
- ExecutorService mutationStage =
StageManager.getStage(StageManager.MUTATION_STAGE);
+ ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isTerminated())
{
logger_.warn("Cannot drain node (did it already happen?)");
@@ -1711,7 +1712,7 @@ public class StorageService implements I
Migration migration = null;
try
{
- migration =
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(call).get();
+ migration =
StageManager.getStage(Stage.MIGRATION).submit(call).get();
}
catch (InterruptedException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Mon
Sep 6 22:22:35 2010
@@ -26,6 +26,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
@@ -94,7 +95,7 @@ class FileStatus
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
FileStatus.serializer().serialize(this, dos);
- return new Message(FBUtilities.getLocalAddress(), "",
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
+ return new Message(FBUtilities.getLocalAddress(), Stage.MISC,
StorageService.Verb.STREAM_STATUS, bos.toByteArray());
}
private static class FileStatusSerializer implements
ICompactSerializer<FileStatus>
Modified:
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamRequestMessage.java
Mon Sep 6 22:22:35 2010
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
@@ -96,7 +97,7 @@ class StreamRequestMessage
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(),
StageManager.STREAM_STAGE, StorageService.Verb.STREAM_REQUEST,
bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), Stage.STREAM,
StorageService.Verb.STREAM_REQUEST, bos.toByteArray() );
}
public String toString()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Mon Sep 6 22:22:35 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.migration.Migration;
import org.apache.cassandra.db.migration.UpdateColumnFamily;
import org.apache.cassandra.db.migration.UpdateKeyspace;
@@ -636,7 +637,7 @@ public class CassandraServer implements
// InvalidRequestException. atypical failures will throw a
RuntimeException.
private static void applyMigrationOnStage(final Migration m) throws
InvalidRequestException
{
- Future f =
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Callable()
+ Future f = StageManager.getStage(Stage.MIGRATION).submit(new Callable()
{
public Object call() throws Exception
{
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=993165&r1=993164&r2=993165&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
Mon Sep 6 22:22:35 2010
@@ -31,6 +31,7 @@ import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
@@ -240,7 +241,7 @@ public class AntiEntropyServiceTest exte
void flushAES() throws Exception
{
- final ThreadPoolExecutor stage =
StageManager.getStage(StageManager.AE_SERVICE_STAGE);
+ final ThreadPoolExecutor stage =
StageManager.getStage(Stage.AE_SERVICE);
final Callable noop = new Callable<Object>()
{
public Boolean call()