Author: jbellis
Date: Fri Jan 15 03:01:11 2010
New Revision: 899519

URL: http://svn.apache.org/viewvc?rev=899519&view=rev
Log:
rename read executor and give it one thread.  patch by jbellis; tested by 
Brandon Williams for CASSANDRA-701

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
 Fri Jan 15 03:01:11 2010
@@ -53,7 +53,7 @@
     {
         stages.put(MUTATION_STAGE, multiThreadedStage(MUTATION_STAGE, 
getConcurrentWriters()));
         stages.put(READ_STAGE, multiThreadedStage(READ_STAGE, 
getConcurrentReaders()));
-        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", 
MessagingService.MESSAGE_DESERIALIZE_THREADS));
+        stages.put(RESPONSE_STAGE, multiThreadedStage("RESPONSE-STAGE", 
Runtime.getRuntime().availableProcessors()));
         // the rest are all single-threaded
         stages.put(STREAM_STAGE, new 
JMXEnabledThreadPoolExecutor(STREAM_STAGE));
         stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
 Fri Jan 15 03:01:11 2010
@@ -67,7 +67,7 @@
     private static Map<String, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging read activities of Socket and default 
stage */
-    private static ExecutorService messageDeserializationExecutor_;
+    private static ExecutorService messageReadExecutor_;
     
     /* Thread pool to handle deserialization of messages read from the socket. 
*/
     private static ExecutorService messageDeserializerExecutor_;
@@ -83,8 +83,6 @@
     
     private static volatile MessagingService messagingService_ = new 
MessagingService();
 
-    public static final int MESSAGE_DESERIALIZE_THREADS = 4;
-
     public static int getVersion()
     {
         return version_;
@@ -123,25 +121,19 @@
         */
         callbackMap_ = new Cachetable<String, IAsyncCallback>( 2 * 
DatabaseDescriptor.getRpcTimeout() );
         taskCompletionMap_ = new Cachetable<String, IAsyncResult>( 2 * 
DatabaseDescriptor.getRpcTimeout() );        
-        
-        messageDeserializationExecutor_ = new JMXEnabledThreadPoolExecutor(
-                MESSAGE_DESERIALIZE_THREADS,
-                MESSAGE_DESERIALIZE_THREADS,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new NamedThreadFactory("MESSAGING-SERVICE-POOL")
-        );
-
-        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(
-                MESSAGE_DESERIALIZE_THREADS,
-                MESSAGE_DESERIALIZE_THREADS,
-                Integer.MAX_VALUE,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue<Runnable>(),
-                new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL")
-        );
-        
+
+        // read executor will have one runnable enqueued per connection with 
stuff to read on it,
+        // so there is no need to make it bounded, and one thread should be 
plenty.
+        messageReadExecutor_ = new 
JMXEnabledThreadPoolExecutor("MS-CONNECTION-READ-POOL");
+
+        // read executor puts messages to deserialize on this.
+        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1,
+                                                                        
Runtime.getRuntime().availableProcessors(),
+                                                                        
Integer.MAX_VALUE,
+                                                                        
TimeUnit.SECONDS,
+                                                                        new 
LinkedBlockingQueue<Runnable>(),
+                                                                        new 
NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
+
         streamExecutor_ = new 
JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
                 
         protocol_ = hash(HashingSchemes.MD5, "FB-MESSAGING".getBytes());       
 
@@ -451,7 +443,7 @@
             udpConnections_.clear();
 
             /* Shutdown the threads in the EventQueue's */
-            messageDeserializationExecutor_.shutdownNow();
+            messageReadExecutor_.shutdownNow();
             messageDeserializerExecutor_.shutdownNow();
             streamExecutor_.shutdownNow();
 
@@ -508,7 +500,7 @@
 
     public static ExecutorService getReadExecutor()
     {
-        return messageDeserializationExecutor_;
+        return messageReadExecutor_;
     }
 
     public static ExecutorService getDeserializationExecutor()

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=899519&r1=899518&r2=899519&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java 
Fri Jan 15 03:01:11 2010
@@ -48,7 +48,7 @@
     private TcpConnectionManager pool_;
     private boolean isIncoming_ = false;
     private TcpReader tcpReader_;
-    private ReadWorkItem readWork_ = new ReadWorkItem(); 
+    private ConnectionReader reader_ = new ConnectionReader();
     private Queue<ByteBuffer> pendingWrites_ = new 
ConcurrentLinkedQueue<ByteBuffer>();
     private InetAddress localEp_;
     private InetAddress remoteEp_;
@@ -401,12 +401,12 @@
     {
         turnOffInterestOps(key, SelectionKey.OP_READ);
         // publish this event onto to the TCPReadEvent Queue.
-        MessagingService.getReadExecutor().execute(readWork_);
+        MessagingService.getReadExecutor().execute(reader_);
     }
     
-    class ReadWorkItem implements Runnable
+    class ConnectionReader implements Runnable
     {                 
-        // called from the TCP READ thread pool
+        // called from the TCP READ executor
         public void run()
         {                         
             if ( tcpReader_ == null )
@@ -441,7 +441,6 @@
                     }
                     else
                     {
-                        /* Close this socket connection  used for streaming */
                         closeSocket();
                     }                    
                 }


Reply via email to