Author: eevans
Date: Wed Sep  1 21:49:56 2010
New Revision: 991715

URL: http://svn.apache.org/viewvc?rev=991715&view=rev
Log:
ThreadPool for avro that cleans up client state @ shutdown

Patch by Stu Hood; reviewed by eevans

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Wed 
Sep  1 21:49:56 2010
@@ -23,7 +23,9 @@ import java.net.InetAddress;
 import java.util.UUID;
 
 import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.ResponderServlet;
 import org.apache.avro.specific.SpecificResponder;
+
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
@@ -38,13 +40,17 @@ import org.apache.cassandra.utils.Mx4jTo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+// see CASSANDRA-1440
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
 /**
  * The Avro analogue to org.apache.cassandra.service.CassandraDaemon.
  *
  */
 public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassandraDaemon {
     private static Logger logger = 
LoggerFactory.getLogger(CassandraDaemon.class);
-    private HttpServer server;
+    private org.mortbay.jetty.Server server;
     private InetAddress listenAddr;
     private int listenPort;
     
@@ -129,24 +135,46 @@ public class CassandraDaemon extends org
     {
         if (logger.isDebugEnabled())
             logger.debug(String.format("Binding avro service to %s:%s", 
listenAddr, listenPort));
-        SpecificResponder responder = new SpecificResponder(Cassandra.class, 
new CassandraServer());
+        CassandraServer cassandraServer = new CassandraServer();
+        SpecificResponder responder = new SpecificResponder(Cassandra.class, 
cassandraServer);
         
         logger.info("Listening for avro clients...");
         Mx4jTool.maybeLoad();
+
         // FIXME: This isn't actually binding to listenAddr (it should).
-        server = new HttpServer(responder, listenPort);
-        server.start();
+        server = new org.mortbay.jetty.Server(listenPort);
+        server.setThreadPool(new 
CleaningThreadPool(cassandraServer.clientState,
+                                                    MIN_WORKER_THREADS,
+                                                    Integer.MAX_VALUE));
+        try
+        {
+            // see CASSANDRA-1440
+            ResponderServlet servlet = new ResponderServlet(responder);
+            new Context(server, "/").addServlet(new ServletHolder(servlet), 
"/*");
+
+            server.start();
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Could not start Avro server.", e);
+        }
     }
     
     /** hook for JSVC */
     public void stop()
     {
         logger.info("Cassandra shutting down...");
-        server.close();
+        try
+        {
+            server.stop();
+        }
+        catch (Exception e)
+        {
+            logger.error("Avro server did not exit cleanly.", e);
+        }
     }
     
     public static void main(String[] args) {
         new CassandraDaemon().activate();
     }
-
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Wed 
Sep  1 21:49:56 2010
@@ -95,7 +95,7 @@ public class CassandraServer implements 
     public final static String D_COLDEF_INDEXNAME = null;
     
     // thread local state containing session information
-    private final ClientState clientState = new ClientState();
+    final ClientState clientState = new ClientState();
 
     /*
      * RequestScheduler to perform the scheduling of incoming requests

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 Wed Sep  1 21:49:56 2010
@@ -20,10 +20,16 @@ package org.apache.cassandra.service;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.mortbay.thread.ThreadPool;
+
 /**
  * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
  * service, which defines not only a way to activate and deactivate it, but 
also
@@ -36,6 +42,8 @@ public abstract class AbstractCassandraD
     private static Logger logger = LoggerFactory
             .getLogger(AbstractCassandraDaemon.class);
     
+    public static final int MIN_WORKER_THREADS = 64;
+
     /**
      * This is a hook for concrete daemons to initialize themselves suitably.
      * 
@@ -123,4 +131,67 @@ public abstract class AbstractCassandraD
         destroy();
     }
     
+    /**
+     * A subclass of Java's ThreadPoolExecutor which implements Jetty's 
ThreadPool
+     * interface (for integration with Avro), and performs ClientState cleanup.
+     */
+    public static class CleaningThreadPool extends ThreadPoolExecutor 
implements ThreadPool
+    {
+        private ClientState state;
+        public CleaningThreadPool(ClientState state, int minWorkerThread, int 
maxWorkerThreads)
+        {
+            super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new 
SynchronousQueue<Runnable>());
+            this.state = state;
+        }
+
+        @Override
+        protected void afterExecute(Runnable r, Throwable t)
+        {
+            super.afterExecute(r, t);
+            state.logout();
+        }
+
+        /*********************************************************************/
+        /**   The following are cribbed from org.mortbay.thread.concurrent   */
+        /*********************************************************************/
+
+        @Override
+        public boolean dispatch(Runnable job)
+        {
+            try
+            {       
+                execute(job);
+                return true;
+            }
+            catch(RejectedExecutionException e)
+            {
+                logger.error("Failed to dispatch thread:", e);
+                return false;
+            }
+        }
+
+        @Override
+        public int getIdleThreads()
+        {
+            return getPoolSize()-getActiveCount();
+        }
+
+        @Override
+        public int getThreads()
+        {
+            return getPoolSize();
+        }
+
+        @Override
+        public boolean isLowOnThreads()
+        {
+            return getActiveCount()>=getMaximumPoolSize();
+        }
+
+        @Override
+        public void join() throws InterruptedException
+        {
+            this.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
Wed Sep  1 21:49:56 2010
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -180,23 +179,11 @@ public class CassandraDaemon extends org
 
         // ThreadPool Server
         CustomTThreadPoolServer.Options options = new 
CustomTThreadPoolServer.Options();
-        options.minWorkerThreads = 64;
+        options.minWorkerThreads = MIN_WORKER_THREADS;
 
-        SynchronousQueue<Runnable> executorQueue = new 
SynchronousQueue<Runnable>();
-
-        ExecutorService executorService = new 
ThreadPoolExecutor(options.minWorkerThreads,
-                                                                 
options.maxWorkerThreads,
-                                                                 60,
-                                                                 
TimeUnit.SECONDS,
-                                                                 executorQueue)
-        {
-            @Override
-            protected void afterExecute(Runnable r, Throwable t)
-            {
-                super.afterExecute(r, t);
-                cassandraServer.clientState.logout();
-            }
-        };
+        ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState,
+                                                                 
options.minWorkerThreads,
+                                                                 
options.maxWorkerThreads);
         serverEngine = new CustomTThreadPoolServer(new 
TProcessorFactory(processor),
                                              tServerSocket,
                                              inTransportFactory,


Reply via email to