Author: eevans
Date: Wed Sep 1 21:49:56 2010
New Revision: 991715
URL: http://svn.apache.org/viewvc?rev=991715&view=rev
Log:
ThreadPool for avro that cleans up client state @ shutdown
Patch by Stu Hood; reviewed by eevans
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Wed
Sep 1 21:49:56 2010
@@ -23,7 +23,9 @@ import java.net.InetAddress;
import java.util.UUID;
import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.ResponderServlet;
import org.apache.avro.specific.SpecificResponder;
+
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
@@ -38,13 +40,17 @@ import org.apache.cassandra.utils.Mx4jTo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+// see CASSANDRA-1440
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
/**
* The Avro analogue to org.apache.cassandra.service.CassandraDaemon.
*
*/
public class CassandraDaemon extends
org.apache.cassandra.service.AbstractCassandraDaemon {
private static Logger logger =
LoggerFactory.getLogger(CassandraDaemon.class);
- private HttpServer server;
+ private org.mortbay.jetty.Server server;
private InetAddress listenAddr;
private int listenPort;
@@ -129,24 +135,46 @@ public class CassandraDaemon extends org
{
if (logger.isDebugEnabled())
logger.debug(String.format("Binding avro service to %s:%s",
listenAddr, listenPort));
- SpecificResponder responder = new SpecificResponder(Cassandra.class,
new CassandraServer());
+ CassandraServer cassandraServer = new CassandraServer();
+ SpecificResponder responder = new SpecificResponder(Cassandra.class,
cassandraServer);
logger.info("Listening for avro clients...");
Mx4jTool.maybeLoad();
+
// FIXME: This isn't actually binding to listenAddr (it should).
- server = new HttpServer(responder, listenPort);
- server.start();
+ server = new org.mortbay.jetty.Server(listenPort);
+ server.setThreadPool(new
CleaningThreadPool(cassandraServer.clientState,
+ MIN_WORKER_THREADS,
+ Integer.MAX_VALUE));
+ try
+ {
+ // see CASSANDRA-1440
+ ResponderServlet servlet = new ResponderServlet(responder);
+ new Context(server, "/").addServlet(new ServletHolder(servlet),
"/*");
+
+ server.start();
+ }
+ catch (Exception e)
+ {
+ throw new IOException("Could not start Avro server.", e);
+ }
}
/** hook for JSVC */
public void stop()
{
logger.info("Cassandra shutting down...");
- server.close();
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ logger.error("Avro server did not exit cleanly.", e);
+ }
}
public static void main(String[] args) {
new CassandraDaemon().activate();
}
-
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Wed
Sep 1 21:49:56 2010
@@ -95,7 +95,7 @@ public class CassandraServer implements
public final static String D_COLDEF_INDEXNAME = null;
// thread local state containing session information
- private final ClientState clientState = new ClientState();
+ final ClientState clientState = new ClientState();
/*
* RequestScheduler to perform the scheduling of incoming requests
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Wed Sep 1 21:49:56 2010
@@ -20,10 +20,16 @@ package org.apache.cassandra.service;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.mortbay.thread.ThreadPool;
+
/**
* The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
* service, which defines not only a way to activate and deactivate it, but
also
@@ -36,6 +42,8 @@ public abstract class AbstractCassandraD
private static Logger logger = LoggerFactory
.getLogger(AbstractCassandraDaemon.class);
+ public static final int MIN_WORKER_THREADS = 64;
+
/**
* This is a hook for concrete daemons to initialize themselves suitably.
*
@@ -123,4 +131,67 @@ public abstract class AbstractCassandraD
destroy();
}
+ /**
+ * A subclass of Java's ThreadPoolExecutor which implements Jetty's
ThreadPool
+ * interface (for integration with Avro), and performs ClientState cleanup.
+ */
+ public static class CleaningThreadPool extends ThreadPoolExecutor
implements ThreadPool
+ {
+ private ClientState state;
+ public CleaningThreadPool(ClientState state, int minWorkerThread, int
maxWorkerThreads)
+ {
+ super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>());
+ this.state = state;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r, t);
+ state.logout();
+ }
+
+ /*********************************************************************/
+ /** The following are cribbed from org.mortbay.thread.concurrent */
+ /*********************************************************************/
+
+ @Override
+ public boolean dispatch(Runnable job)
+ {
+ try
+ {
+ execute(job);
+ return true;
+ }
+ catch(RejectedExecutionException e)
+ {
+ logger.error("Failed to dispatch thread:", e);
+ return false;
+ }
+ }
+
+ @Override
+ public int getIdleThreads()
+ {
+ return getPoolSize()-getActiveCount();
+ }
+
+ @Override
+ public int getThreads()
+ {
+ return getPoolSize();
+ }
+
+ @Override
+ public boolean isLowOnThreads()
+ {
+ return getActiveCount()>=getMaximumPoolSize();
+ }
+
+ @Override
+ public void join() throws InterruptedException
+ {
+ this.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=991715&r1=991714&r2=991715&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Wed Sep 1 21:49:56 2010
@@ -23,7 +23,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -180,23 +179,11 @@ public class CassandraDaemon extends org
// ThreadPool Server
CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
- options.minWorkerThreads = 64;
+ options.minWorkerThreads = MIN_WORKER_THREADS;
- SynchronousQueue<Runnable> executorQueue = new
SynchronousQueue<Runnable>();
-
- ExecutorService executorService = new
ThreadPoolExecutor(options.minWorkerThreads,
-
options.maxWorkerThreads,
- 60,
-
TimeUnit.SECONDS,
- executorQueue)
- {
- @Override
- protected void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- cassandraServer.clientState.logout();
- }
- };
+ ExecutorService executorService = new
CleaningThreadPool(cassandraServer.clientState,
+
options.minWorkerThreads,
+
options.maxWorkerThreads);
serverEngine = new CustomTThreadPoolServer(new
TProcessorFactory(processor),
tServerSocket,
inTransportFactory,