Author: gdusbabek
Date: Wed Jan 26 15:05:28 2011
New Revision: 1063753
URL: http://svn.apache.org/viewvc?rev=1063753&view=rev
Log:
merge from 0.7 (CASSANDRA-1951)
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1063562
+/cassandra/branches/cassandra-0.7:1026516-1063747
/cassandra/branches/cassandra-0.7.0:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3:774578-796573
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 26 15:05:28 2011
@@ -9,6 +9,7 @@
0.7.2-dev
* fix potential overflow in nodetool cfstats
+ * offline nodes (CASSANDRA-1951)
0.7.1
Modified:
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
---
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
(original)
+++
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
Wed Jan 26 15:05:28 2011
@@ -63,7 +63,6 @@ import org.junit.Test;
public class CassandraServiceTest {
private static EmbeddedCassandraService cassandra;
- private static Thread cassandraRunner;
private static CassandraServiceDataCleaner cleaner;
/**
@@ -90,13 +89,7 @@ public class CassandraServiceTest {
cleaner.prepare();
cassandra = new EmbeddedCassandraService();
- cassandra.init();
-
- if ( cassandraRunner == null ) {
- cassandraRunner = new Thread(cassandra);
- cassandraRunner.setDaemon(true);
- cassandraRunner.start();
- }
+ cassandra.start();
}
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1063747
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1063747
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1063747
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1063747
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1063747
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
Wed Jan 26 15:05:28 2011
@@ -56,6 +56,10 @@ import org.mortbay.thread.ThreadPool;
*/
public abstract class AbstractCassandraDaemon implements CassandraDaemon
{
+ public AbstractCassandraDaemon()
+ {
+ StorageService.instance.registerDaemon(this);
+ }
//Initialize logging in such a way that it checks for config changes every
10 seconds.
static
@@ -82,6 +86,7 @@ public abstract class AbstractCassandraD
protected InetAddress listenAddr;
protected int listenPort;
+ protected volatile boolean isRunning = false;
public static final int MIN_WORKER_THREADS = 64;
@@ -211,15 +216,82 @@ public abstract class AbstractCassandraD
* Start the Cassandra Daemon, assuming that it has already been
* initialized, via either {@link #init(String[])} or
* {@link #load(String[])}.
- *
+ *
+ * Hook for JSVC
+ *
* @throws IOException
*/
- public abstract void start() throws IOException;
+ public void start()
+ {
+ if (Boolean.parseBoolean(System.getProperty("cassandra.start_rpc",
"true")))
+ {
+ startRPCServer();
+ }
+ else
+ {
+ logger.info("Not starting RPC server as requested. Use JMX
(StorageService->startRPCServer()) to start it");
+ }
+ }
/**
* Stop the daemon, ideally in an idempotent manner.
+ *
+ * Hook for JSVC
+ */
+ public void stop()
+ {
+ // this doesn't entirely shut down Cassandra, just the RPC server.
+ // jsvc takes care of taking the rest down
+ logger.info("Cassandra shutting down...");
+ stopRPCServer();
+ }
+
+ /**
+ * Start the underlying RPC server in idempotent manner.
+ */
+ public void startRPCServer()
+ {
+ if (!isRunning)
+ {
+ startServer();
+ isRunning = true;
+ }
+ }
+
+ /**
+ * Stop the underlying RPC server in idempotent manner.
+ */
+ public void stopRPCServer()
+ {
+ if (isRunning)
+ {
+ stopServer();
+ isRunning = false;
+ }
+ }
+
+ /**
+ * Returns whether the underlying RPC server is running or not.
+ */
+ public boolean isRPCServerRunning()
+ {
+ return isRunning;
+ }
+
+ /**
+ * Start the underlying RPC server.
+ * This method shoud be able to restart a server stopped through
stopServer().
+ * Should throw a RuntimeException if the server cannot be started
*/
- public abstract void stop();
+ protected abstract void startServer();
+
+ /**
+ * Stop the underlying RPC server.
+ * This method should be able to stop server started through startServer().
+ * Should throw a RuntimeException if the server cannot be stopped
+ */
+ protected abstract void stopServer();
+
/**
* Clean up all resources obtained during the lifetime of the daemon. This
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
Wed Jan 26 15:05:28 2011
@@ -60,6 +60,10 @@ public interface CassandraDaemon
* to clarify, this is a hook for JSVC.
*/
public void destroy();
+
+ public void startRPCServer();
+ public void stopRPCServer();
+ public boolean isRPCServerRunning();
/**
* A convenience method to initialize and start the daemon in one shot.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
Wed Jan 26 15:05:28 2011
@@ -37,35 +37,26 @@ import org.apache.thrift.transport.TTran
* This is the implementation of
https://issues.apache.org/jira/browse/CASSANDRA-740
* <p>
* How to use:
- * In the client code create a new thread and spawn it with its {@link
Thread#start()} method.
+ * In the client code simply create a new EmbeddedCassandraService and start
it.
* Example:
* <pre>
cassandra = new EmbeddedCassandraService();
- cassandra.init();
-
- // spawn cassandra in a new thread
- Thread t = new Thread(cassandra);
- t.setDaemon(true);
- t.start();
+ cassandra.start();
* </pre>
* @author Ran Tavory ([email protected])
*
*/
-public class EmbeddedCassandraService implements Runnable
+public class EmbeddedCassandraService
{
CassandraDaemon cassandraDaemon;
- public void init() throws TTransportException, IOException
+ public void start() throws IOException
{
cassandraDaemon = new CassandraDaemon();
cassandraDaemon.init(null);
- }
-
- public void run()
- {
cassandraDaemon.start();
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Jan 26 15:05:28 2011
@@ -172,6 +172,8 @@ public class StorageService implements I
private TokenMetadata tokenMetadata_ = new TokenMetadata();
private Set<InetAddress> replicatingNodes =
Collections.synchronizedSet(new HashSet<InetAddress>());
+ private CassandraDaemon daemon;
+
private InetAddress removingNode;
/* Are we starting this node in bootstrap mode? */
@@ -248,6 +250,11 @@ public class StorageService implements I
throw new RuntimeException("Streaming service is unavailable.");
}
+ public void registerDaemon(CassandraDaemon daemon)
+ {
+ this.daemon = daemon;
+ }
+
// should only be called via JMX
public void stopGossiping()
{
@@ -270,6 +277,35 @@ public class StorageService implements I
}
}
+ // should only be called via JMX
+ public void startRPCServer()
+ {
+ if (daemon == null)
+ {
+ throw new IllegalStateException("No configured RPC daemon");
+ }
+ daemon.startRPCServer();
+ }
+
+ // should only be called via JMX
+ public void stopRPCServer()
+ {
+ if (daemon == null)
+ {
+ throw new IllegalStateException("No configured RPC daemon");
+ }
+ daemon.stopRPCServer();
+ }
+
+ public boolean isRPCServerRunning()
+ {
+ if (daemon == null)
+ {
+ throw new IllegalStateException("No configured RPC daemon");
+ }
+ return daemon.isRPCServerRunning();
+ }
+
public void stopClient()
{
Gossiper.instance.unregister(migrationManager);
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Wed Jan 26 15:05:28 2011
@@ -281,6 +281,15 @@ public interface StorageServiceMBean
// to determine if gossip is disabled
public boolean isInitialized();
+ // allows a user to disable thrift
+ public void stopRPCServer();
+
+ // allows a user to reenable thrift
+ public void startRPCServer();
+
+ // to determine if thrift is running
+ public boolean isRPCServerRunning();
+
public void invalidateKeyCaches(String ks, String... cfs) throws
IOException;
public void invalidateRowCaches(String ks, String... cfs) throws
IOException;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Wed Jan 26 15:05:28 2011
@@ -19,6 +19,7 @@
package org.apache.cassandra.thrift;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
@@ -47,89 +48,118 @@ import org.apache.thrift.transport.TTran
public class CassandraDaemon extends
org.apache.cassandra.service.AbstractCassandraDaemon
{
private static Logger logger =
LoggerFactory.getLogger(CassandraDaemon.class);
- private TServer serverEngine;
+ private ThriftServer server;
- protected void setup() throws IOException
+ protected void startServer()
{
- super.setup();
-
- // now we start listening for clients
- final CassandraServer cassandraServer = new CassandraServer();
- Cassandra.Processor processor = new
Cassandra.Processor(cassandraServer);
-
- // Transport
- TServerSocket tServerSocket = null;
-
- try
+ if (server == null)
{
- tServerSocket = new TCustomServerSocket(new
InetSocketAddress(listenAddr, listenPort),
-
DatabaseDescriptor.getRpcKeepAlive(),
-
DatabaseDescriptor.getRpcSendBufferSize(),
-
DatabaseDescriptor.getRpcRecvBufferSize());
+ server = new ThriftServer(listenAddr, listenPort);
+ server.start();
}
- catch (TTransportException e)
- {
- throw new IOException(String.format("Unable to create thrift
socket to %s:%s",
- listenAddr, listenPort), e);
- }
-
- logger.info(String.format("Binding thrift service to %s:%s",
listenAddr, listenPort));
+ }
- // Protocol factory
- TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true,
- true,
-
DatabaseDescriptor.getThriftMaxMessageLength());
-
- // Transport factory
- TTransportFactory inTransportFactory, outTransportFactory;
- if (DatabaseDescriptor.isThriftFramed())
- {
- int tFramedTransportSize =
DatabaseDescriptor.getThriftFramedTransportSize();
- inTransportFactory = new TFastFramedTransport.Factory(64 * 1024,
tFramedTransportSize);
- outTransportFactory = new TFastFramedTransport.Factory(64 * 1024,
tFramedTransportSize);
- logger.info("Using TFastFramedTransport with a max frame size of
{} bytes.", tFramedTransportSize);
- }
- else
+ protected void stopServer()
+ {
+ if (server != null)
{
- inTransportFactory = new TTransportFactory();
- outTransportFactory = new TTransportFactory();
+ server.stopServer();
+ try
+ {
+ server.join();
+ }
+ catch (InterruptedException e)
+ {
+ logger.error("Interrupted while waiting thrift server to
stop", e);
+ }
+ server = null;
}
-
- // ThreadPool Server
- CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
- options.minWorkerThreads = MIN_WORKER_THREADS;
-
- ExecutorService executorService = new
CleaningThreadPool(cassandraServer.clientState,
-
options.minWorkerThreads,
-
options.maxWorkerThreads);
- serverEngine = new CustomTThreadPoolServer(new
TProcessorFactory(processor),
- tServerSocket,
- inTransportFactory,
- outTransportFactory,
- tProtocolFactory,
- tProtocolFactory,
- options,
- executorService);
}
- /** hook for JSVC */
- public void start()
+ public static void main(String[] args)
{
- logger.info("Listening for thrift clients...");
- serverEngine.serve();
+ new CassandraDaemon().activate();
}
- /** hook for JSVC */
- public void stop()
+ /**
+ * Simple class to run the thrift connection accepting code in separate
+ * thread of control.
+ */
+ private static class ThriftServer extends Thread
{
- // this doesn't entirely shut down Cassandra, just the Thrift server.
- // jsvc takes care of taking the rest down
- logger.info("Cassandra shutting down...");
- serverEngine.stop();
- }
-
- public static void main(String[] args)
- {
- new CassandraDaemon().activate();
+ private TServer serverEngine;
+
+ public ThriftServer(InetAddress listenAddr, int listenPort)
+ {
+ // now we start listening for clients
+ final CassandraServer cassandraServer = new CassandraServer();
+ Cassandra.Processor processor = new
Cassandra.Processor(cassandraServer);
+
+ // Transport
+ TServerSocket tServerSocket = null;
+
+ try
+ {
+ tServerSocket = new TCustomServerSocket(new
InetSocketAddress(listenAddr, listenPort),
+ DatabaseDescriptor.getRpcKeepAlive(),
+ DatabaseDescriptor.getRpcSendBufferSize(),
+ DatabaseDescriptor.getRpcRecvBufferSize());
+ }
+ catch (TTransportException e)
+ {
+ throw new RuntimeException(String.format("Unable to create
thrift socket to %s:%s",
+ listenAddr, listenPort), e);
+ }
+
+ logger.info(String.format("Binding thrift service to %s:%s",
listenAddr, listenPort));
+
+ // Protocol factory
+ TProtocolFactory tProtocolFactory = new
TBinaryProtocol.Factory(true,
+ true,
+ DatabaseDescriptor.getThriftMaxMessageLength());
+
+ // Transport factory
+ TTransportFactory inTransportFactory, outTransportFactory;
+ if (DatabaseDescriptor.isThriftFramed())
+ {
+ int tFramedTransportSize =
DatabaseDescriptor.getThriftFramedTransportSize();
+ inTransportFactory = new TFastFramedTransport.Factory(64 *
1024, tFramedTransportSize);
+ outTransportFactory = new TFastFramedTransport.Factory(64 *
1024, tFramedTransportSize);
+ logger.info("Using TFastFramedTransport with a max frame size
of {} bytes.", tFramedTransportSize);
+ }
+ else
+ {
+ inTransportFactory = new TTransportFactory();
+ outTransportFactory = new TTransportFactory();
+ }
+
+ // ThreadPool Server
+ CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
+ options.minWorkerThreads = MIN_WORKER_THREADS;
+
+ ExecutorService executorService = new
CleaningThreadPool(cassandraServer.clientState,
+ options.minWorkerThreads,
+ options.maxWorkerThreads);
+ serverEngine = new CustomTThreadPoolServer(new
TProcessorFactory(processor),
+ tServerSocket,
+ inTransportFactory,
+ outTransportFactory,
+ tProtocolFactory,
+ tProtocolFactory,
+ options,
+ executorService);
+ }
+
+ public void run()
+ {
+ logger.info("Listening for thrift clients...");
+ serverEngine.serve();
+ }
+
+ public void stopServer()
+ {
+ logger.info("Stop listening to thrift clients");
+ serverEngine.stop();
+ }
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Wed Jan 26
15:05:28 2011
@@ -146,7 +146,7 @@ public class CliTest extends CleanupHelp
@Test
public void testCli() throws IOException, TTransportException,
ConfigurationException
{
- setup();
+ new EmbeddedCassandraService().start();
// new error/output streams for CliSessionState
ByteArrayOutputStream errStream = new ByteArrayOutputStream();
@@ -204,24 +204,4 @@ public class CliTest extends CleanupHelp
errStream.reset(); // no errors to the end user.
}
}
-
- /**
- * Setup embedded cassandra instance using test config.
- * @throws TTransportException - when trying to bind address
- * @throws IOException - when reading config file
- * @throws ConfigurationException - when can set up configuration
- */
- private void setup() throws TTransportException, IOException,
ConfigurationException
- {
- EmbeddedCassandraService cassandra;
-
- cassandra = new EmbeddedCassandraService();
- cassandra.init();
-
- // spawn cassandra in a new thread
- Thread t = new Thread(cassandra);
- t.setDaemon(true);
- t.start();
- }
-
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
Wed Jan 26 15:05:28 2011
@@ -64,14 +64,8 @@ public class EmbeddedCassandraServiceTes
@BeforeClass
public static void setup() throws TTransportException, IOException,
InterruptedException, ConfigurationException
{
-
cassandra = new EmbeddedCassandraService();
- cassandra.init();
-
- // spawn cassandra in a new thread
- Thread t = new Thread(cassandra);
- t.setDaemon(true);
- t.start();
+ cassandra.start();
}
@Test