Author: jbellis
Date: Fri Aug 13 17:07:56 2010
New Revision: 985287
URL: http://svn.apache.org/viewvc?rev=985287&view=rev
Log:
merge from 0.6
Removed:
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeserializationTask.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-985244
+/cassandra/branches/cassandra-0.6:922689-985285
/cassandra/trunk:978791
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 13 17:07:56 2010
@@ -70,6 +70,8 @@ dev
initialization (CASSANDRA-1377)
* fix errors in hard-coded bloom filter optKPerBucket by computing it
algorithmically (CASSANDRA-1220
+ * remove message deserialization stage, and uncap read/write stages
+ so slow reads/writes don't block gossip processing (CASSANDRA-1358)
0.6.4
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-985285
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-985285
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-985285
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-985285
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Aug 13 17:07:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985244
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-985285
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
Fri Aug 13 17:07:56 2010
@@ -74,7 +74,7 @@ public class StageManager
numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+ new
LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(name));
}
@@ -86,7 +86,7 @@ public class StageManager
numThreads,
KEEPALIVE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getStageQueueSize()),
+ new
LinkedBlockingQueue<Runnable>(),
new
NamedThreadFactory(name));
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Fri Aug 13 17:07:56 2010
@@ -107,8 +107,6 @@ public class DatabaseDescriptor
throw new ConfigurationException("Cannot locate " + STORAGE_CONF_FILE
+ " on the classpath");
}
- private static int stageQueueSize_ = 4096;
-
static
{
try
@@ -1008,11 +1006,6 @@ public class DatabaseDescriptor
return getCFMetaData(tableName, cfName).subcolumnComparator;
}
- public static int getStageQueueSize()
- {
- return stageQueueSize_;
- }
-
public static AbstractReconciler getReconciler(String tableName, String
cfName)
{
assert tableName != null;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Fri Aug 13 17:07:56 2010
@@ -27,6 +27,7 @@ import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.streaming.IncomingStreamReader;
import org.apache.cassandra.streaming.StreamHeader;
@@ -76,7 +77,9 @@ public class IncomingTcpConnection exten
int size = input.readInt();
byte[] contentBytes = new byte[size];
input.readFully(contentBytes);
- MessagingService.getDeserializationExecutor().submit(new
MessageDeserializationTask(new ByteArrayInputStream(contentBytes)));
+
+ Message message = Message.serializer().deserialize(new
DataInputStream(new ByteArrayInputStream(contentBytes)));
+ MessagingService.receive(message);
}
}
catch (EOFException e)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
Fri Aug 13 17:07:56 2010
@@ -21,13 +21,16 @@ package org.apache.cassandra.net;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.service.StorageService;
public class MessageDeliveryTask implements Runnable
{
+ private static final Logger logger_ =
LoggerFactory.getLogger(MessageDeliveryTask.class);
+
private Message message_;
- private static Logger logger_ =
LoggerFactory.getLogger(MessageDeliveryTask.class);
-
+ private final long constructionTime_ = System.currentTimeMillis();
+
public MessageDeliveryTask(Message message)
{
message_ = message;
@@ -35,6 +38,12 @@ public class MessageDeliveryTask impleme
public void run()
{
+ if (System.currentTimeMillis() > constructionTime_ +
DatabaseDescriptor.getRpcTimeout())
+ {
+ MessagingService.incrementDroppedMessages();
+ return;
+ }
+
StorageService.Verb verb = message_.getVerb();
IVerbHandler verbHandler =
MessagingService.instance.getVerbHandler(verb);
assert verbHandler != null : "unknown verb " + verb;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=985287&r1=985286&r2=985287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri
Aug 13 17:07:56 2010
@@ -18,23 +18,6 @@
package org.apache.cassandra.net;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.net.io.SerializerType;
-import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamHeader;
-import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.SimpleCondition;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
@@ -47,13 +30,28 @@ import java.nio.channels.ServerSocketCha
import java.security.MessageDigest;
import java.util.EnumMap;
import java.util.Map;
-import java.util.TimerTask;
import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.io.SerializerType;
+import org.apache.cassandra.net.sink.SinkManager;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamHeader;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
public class MessagingService
{
private static int version_ = 1;
@@ -70,8 +68,8 @@ public class MessagingService
/* Lookup table for registering message handlers based on the verb. */
private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
- /* Thread pool to handle deserialization of messages read from the socket.
*/
- private static ExecutorService messageDeserializerExecutor_;
+ /* Thread pool to handle messages without a specialized stage */
+ private static ExecutorService defaultExecutor_;
/* Thread pool to handle messaging write activities */
private static ExecutorService streamExecutor_;
@@ -106,13 +104,7 @@ public class MessagingService
callbackMap_ = new ExpiringMap<String, IAsyncCallback>((long) (1.1 *
DatabaseDescriptor.getRpcTimeout()));
taskCompletionMap_ = new ExpiringMap<String, IAsyncResult>((long) (1.1
* DatabaseDescriptor.getRpcTimeout()));
- // read executor puts messages to deserialize on this.
- messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
-
Runtime.getRuntime().availableProcessors(),
-
StageManager.KEEPALIVE,
-
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(),
- new
NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+ defaultExecutor_ = new
JMXEnabledThreadPoolExecutor("MISCELLANEOUS-POOL");
streamExecutor_ = new
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
TimerTask logDropped = new TimerTask()
@@ -352,8 +344,8 @@ public class MessagingService
/** blocks until the processing pools are empty and done. */
public static void waitFor() throws InterruptedException
{
- while (!messageDeserializerExecutor_.isTerminated())
- messageDeserializerExecutor_.awaitTermination(5, TimeUnit.SECONDS);
+ while (!defaultExecutor_.isTerminated())
+ defaultExecutor_.awaitTermination(5, TimeUnit.SECONDS);
while (!streamExecutor_.isTerminated())
streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
}
@@ -371,7 +363,7 @@ public class MessagingService
throw new IOError(e);
}
- messageDeserializerExecutor_.shutdownNow();
+ defaultExecutor_.shutdownNow();
streamExecutor_.shutdownNow();
/* shut down the cachetables */
@@ -383,14 +375,16 @@ public class MessagingService
public static void receive(Message message)
{
- Runnable runnable = new MessageDeliveryTask(message);
+ message = SinkManager.processServerMessageSink(message);
+ Runnable runnable = new MessageDeliveryTask(message);
ExecutorService stage =
StageManager.getStage(message.getMessageType());
+
if (stage == null)
{
if (logger_.isDebugEnabled())
logger_.debug("Running " + message.getMessageType() + " on
default stage");
- messageDeserializerExecutor_.execute(runnable);
+ defaultExecutor_.execute(runnable);
}
else
{
@@ -423,11 +417,6 @@ public class MessagingService
return taskCompletionMap_.getAge(key);
}
- public static ExecutorService getDeserializationExecutor()
- {
- return messageDeserializerExecutor_;
- }
-
public static void validateMagic(int magic) throws IOException
{
if (magic != PROTOCOL_MAGIC)