Author: jbellis
Date: Tue Sep 14 03:20:44 2010
New Revision: 996750
URL: http://svn.apache.org/viewvc?rev=996750&view=rev
Log:
move common setup code into AbstractCassandraDaemon. patch by Amol Deshpande;
reviewed by jbellis for CASSANDRA-1500
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Tue
Sep 14 03:20:44 2010
@@ -19,28 +19,13 @@
package org.apache.cassandra.avro;
import java.io.IOException;
-import java.net.InetAddress;
-import java.util.UUID;
-import org.apache.avro.ipc.HttpServer;
-import org.apache.avro.ipc.ResponderServlet;
-import org.apache.avro.specific.SpecificResponder;
-
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Mx4jTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-// see CASSANDRA-1440
+import org.apache.avro.ipc.ResponderServlet;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.utils.Mx4jTool;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
@@ -51,85 +36,7 @@ import org.mortbay.jetty.servlet.Servlet
public class CassandraDaemon extends
org.apache.cassandra.service.AbstractCassandraDaemon {
private static Logger logger =
LoggerFactory.getLogger(CassandraDaemon.class);
private org.mortbay.jetty.Server server;
- private InetAddress listenAddr;
- private int listenPort;
-
- protected void setup() throws IOException
- {
- FBUtilities.tryMlockall();
-
- listenPort = DatabaseDescriptor.getRpcPort();
- listenAddr = DatabaseDescriptor.getRpcAddress();
-
- /*
- * If ThriftAddress was left completely unconfigured, then assume
- * the same default as ListenAddress
- */
- if (listenAddr == null)
- listenAddr = FBUtilities.getLocalAddress();
-
- Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler()
- {
- public void uncaughtException(Thread t, Throwable e)
- {
- logger.error("Fatal exception in thread " + t, e);
- if (e instanceof OutOfMemoryError)
- {
- System.exit(100);
- }
- }
- });
-
- // check the system table for mismatched partitioner.
- try
- {
- SystemTable.checkHealth();
- }
- catch (ConfigurationException e)
- {
- logger.error("Fatal exception during initialization", e);
- System.exit(100);
- }
-
- try
- {
- DatabaseDescriptor.loadSchemas();
- }
- catch (IOException e)
- {
- logger.error("Fatal exception during initialization", e);
- System.exit(100);
- }
-
- // initialize keyspaces
- for (String table : DatabaseDescriptor.getTables())
- {
- if (logger.isDebugEnabled())
- logger.debug("opening keyspace " + table);
- Table.open(table);
- }
- // replay the log if necessary and check for compaction candidates
- CommitLog.recover();
- CompactionManager.instance.checkAllColumnFamilies();
-
- // check to see if CL.recovery modified the lastMigrationId. if it
did, we need to re apply migrations. this isn't
- // the same as merely reloading the schema (which wouldn't perform
file deletion after a DROP). The solution
- // is to read those migrations from disk and apply them.
- UUID currentMigration = DatabaseDescriptor.getDefsVersion();
- UUID lastMigration = Migration.getLastMigrationId();
- if ((lastMigration != null) && (lastMigration.timestamp() >
currentMigration.timestamp()))
- {
- MigrationManager.applyMigrations(currentMigration, lastMigration);
- }
-
- SystemTable.purgeIncompatibleHints();
-
- // start server internals
- StorageService.instance.initServer();
-
- }
-
/** hook for JSVC */
public void start() throws IOException
{
@@ -139,7 +46,6 @@ public class CassandraDaemon extends org
SpecificResponder responder = new SpecificResponder(Cassandra.class,
cassandraServer);
logger.info("Listening for avro clients...");
- Mx4jTool.maybeLoad();
// FIXME: This isn't actually binding to listenAddr (it should).
server = new org.mortbay.jetty.Server(listenPort);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Tue Sep 14 03:20:44 2010
@@ -20,14 +20,25 @@ package org.apache.cassandra.service;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Mx4jTool;
import org.mortbay.thread.ThreadPool;
/**
@@ -42,15 +53,95 @@ public abstract class AbstractCassandraD
private static Logger logger = LoggerFactory
.getLogger(AbstractCassandraDaemon.class);
+ protected InetAddress listenAddr;
+ protected int listenPort;
+
public static final int MIN_WORKER_THREADS = 64;
/**
* This is a hook for concrete daemons to initialize themselves suitably.
- *
+ *
+ * Subclasses should override this to finish the job (listening on ports,
etc.)
+ *
* @throws IOException
*/
- protected abstract void setup() throws IOException;
-
+ protected void setup() throws IOException
+ {
+ FBUtilities.tryMlockall();
+
+ listenPort = DatabaseDescriptor.getRpcPort();
+ listenAddr = DatabaseDescriptor.getRpcAddress();
+
+ /*
+ * If ThriftAddress was left completely unconfigured, then assume
+ * the same default as ListenAddress
+ */
+ if (listenAddr == null)
+ listenAddr = FBUtilities.getLocalAddress();
+
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler()
+ {
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ logger.error("Fatal exception in thread " + t, e);
+ if (e instanceof OutOfMemoryError)
+ {
+ System.exit(100);
+ }
+ }
+ });
+
+ // check the system table for mismatched partitioner.
+ try
+ {
+ SystemTable.checkHealth();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ try
+ {
+ DatabaseDescriptor.loadSchemas();
+ }
+ catch (IOException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
+ // initialize keyspaces
+ for (String table : DatabaseDescriptor.getTables())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("opening keyspace " + table);
+ Table.open(table);
+ }
+
+ // replay the log if necessary and check for compaction candidates
+ CommitLog.recover();
+ CompactionManager.instance.checkAllColumnFamilies();
+
+ // check to see if CL.recovery modified the lastMigrationId. if it
did, we need to re apply migrations. this isn't
+ // the same as merely reloading the schema (which wouldn't perform
file deletion after a DROP). The solution
+ // is to read those migrations from disk and apply them.
+ UUID currentMigration = DatabaseDescriptor.getDefsVersion();
+ UUID lastMigration = Migration.getLastMigrationId();
+ if ((lastMigration != null) && (lastMigration.timestamp() >
currentMigration.timestamp()))
+ {
+ MigrationManager.applyMigrations(currentMigration, lastMigration);
+ }
+
+ SystemTable.purgeIncompatibleHints();
+
+ // start server internals
+ StorageService.instance.initServer();
+
+ Mx4jTool.maybeLoad();
+ }
+
/**
* Initialize the Cassandra Daemon based on the given <a
* href="http://commons.apache.org/daemon/jsvc.html">Commons
@@ -155,7 +246,6 @@ public abstract class AbstractCassandraD
/** The following are cribbed from org.mortbay.thread.concurrent */
/*********************************************************************/
- @Override
public boolean dispatch(Runnable job)
{
try
@@ -170,25 +260,21 @@ public abstract class AbstractCassandraD
}
}
- @Override
public int getIdleThreads()
{
return getPoolSize()-getActiveCount();
}
- @Override
public int getThreads()
{
return getPoolSize();
}
- @Override
public boolean isLowOnThreads()
{
return getActiveCount()>=getMaximumPoolSize();
}
- @Override
public void join() throws InterruptedException
{
this.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Tue Sep 14 03:20:44 2010
@@ -19,25 +19,13 @@
package org.apache.cassandra.thrift;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.UUID;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.CompactionManager;
-import org.apache.cassandra.db.SystemTable;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.migration.Migration;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Mx4jTool;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -46,8 +34,6 @@ import org.apache.thrift.transport.TFram
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class supports two methods for creating a Cassandra node daemon,
@@ -65,78 +51,7 @@ public class CassandraDaemon extends org
protected void setup() throws IOException
{
- FBUtilities.tryMlockall();
-
- int listenPort = DatabaseDescriptor.getRpcPort();
- InetAddress listenAddr = DatabaseDescriptor.getRpcAddress();
-
- /*
- * If ThriftAddress was left completely unconfigured, then assume
- * the same default as ListenAddress
- */
- if (listenAddr == null)
- listenAddr = FBUtilities.getLocalAddress();
-
- Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler()
- {
- public void uncaughtException(Thread t, Throwable e)
- {
- logger.error("Uncaught exception in thread " + t, e);
- if (e instanceof OutOfMemoryError)
- {
- System.exit(100);
- }
- }
- });
-
- // check the system table for mismatched partitioner.
- try
- {
- SystemTable.checkHealth();
- }
- catch (ConfigurationException e)
- {
- logger.error("Fatal exception during initialization", e);
- System.exit(100);
- }
-
- try
- {
- DatabaseDescriptor.loadSchemas();
- }
- catch (IOException e)
- {
- logger.error("Fatal exception during initialization", e);
- System.exit(100);
- }
-
- // initialize keyspaces
- for (String table : DatabaseDescriptor.getTables())
- {
- if (logger.isDebugEnabled())
- logger.debug("opening keyspace " + table);
- Table.open(table);
- }
-
- // replay the log if necessary and check for compaction candidates
- CommitLog.recover();
- CompactionManager.instance.checkAllColumnFamilies();
-
- // check to see if CL.recovery modified the lastMigrationId. if it
did, we need to re apply migrations. this isn't
- // the same as merely reloading the schema (which wouldn't perform
file deletion after a DROP). The solution
- // is to read those migrations from disk and apply them.
- UUID currentMigration = DatabaseDescriptor.getDefsVersion();
- UUID lastMigration = Migration.getLastMigrationId();
- if ((lastMigration != null) && (lastMigration.timestamp() >
currentMigration.timestamp()))
- {
- MigrationManager.applyMigrations(currentMigration, lastMigration);
- }
-
- SystemTable.purgeIncompatibleHints();
-
- // start server internals
- StorageService.instance.initServer();
-
+ super.setup();
// now we start listening for clients
final CassandraServer cassandraServer = new CassandraServer();
Cassandra.Processor processor = new
Cassandra.Processor(cassandraServer);
@@ -176,7 +91,6 @@ public class CassandraDaemon extends org
outTransportFactory = new TTransportFactory();
}
-
// ThreadPool Server
CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
options.minWorkerThreads = MIN_WORKER_THREADS;
@@ -198,7 +112,6 @@ public class CassandraDaemon extends org
public void start()
{
logger.info("Listening for thrift clients...");
- Mx4jTool.maybeLoad();
serverEngine.serve();
}