Author: gdusbabek
Date: Wed Jan 26 15:05:28 2011
New Revision: 1063753

URL: http://svn.apache.org/viewvc?rev=1063753&view=rev
Log:
merge from 0.7 (CASSANDRA-1951)

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
    
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1063562
+/cassandra/branches/cassandra-0.7:1026516-1063747
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Jan 26 15:05:28 2011
@@ -9,6 +9,7 @@
 
 0.7.2-dev
  * fix potential overflow in nodetool cfstats
+ * offline nodes (CASSANDRA-1951)
 
 
 0.7.1

Modified: 
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- 
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
 (original)
+++ 
cassandra/trunk/contrib/javautils/src/test/java/org/apache/cassandra/contrib/utils/service/CassandraServiceTest.java
 Wed Jan 26 15:05:28 2011
@@ -63,7 +63,6 @@ import org.junit.Test;
 public class CassandraServiceTest {
 
     private static EmbeddedCassandraService cassandra;
-    private static Thread cassandraRunner;
     private static CassandraServiceDataCleaner cleaner;
 
     /**
@@ -90,13 +89,7 @@ public class CassandraServiceTest {
         cleaner.prepare();
         
         cassandra = new EmbeddedCassandraService();
-        cassandra.init();
-        
-        if ( cassandraRunner == null ) {
-            cassandraRunner = new Thread(cassandra);
-            cassandraRunner.setDaemon(true);
-            cassandraRunner.start();
-        }
+        cassandra.start();
     }
     
 

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1063747
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1063747
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1063747
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1063747
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Jan 26 15:05:28 2011
@@ -1,5 +1,5 @@
 
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1063562
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1063747
 
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
 Wed Jan 26 15:05:28 2011
@@ -56,6 +56,10 @@ import org.mortbay.thread.ThreadPool;
  */
 public abstract class AbstractCassandraDaemon implements CassandraDaemon
 {
+    public AbstractCassandraDaemon()
+    {
+        StorageService.instance.registerDaemon(this);
+    }
 
     //Initialize logging in such a way that it checks for config changes every 
10 seconds.
     static
@@ -82,6 +86,7 @@ public abstract class AbstractCassandraD
     
     protected InetAddress listenAddr;
     protected int listenPort;
+    protected volatile boolean isRunning = false;
     
     public static final int MIN_WORKER_THREADS = 64;
 
@@ -211,15 +216,82 @@ public abstract class AbstractCassandraD
      * Start the Cassandra Daemon, assuming that it has already been
      * initialized, via either {@link #init(String[])} or
      * {@link #load(String[])}.
-     * 
+     *
+     * Hook for JSVC
+     *
      * @throws IOException
      */
-    public abstract void start() throws IOException;
+    public void start()
+    {
+        if (Boolean.parseBoolean(System.getProperty("cassandra.start_rpc", 
"true")))
+        {
+            startRPCServer();
+        }
+        else
+        {
+            logger.info("Not starting RPC server as requested. Use JMX 
(StorageService->startRPCServer()) to start it");
+        }
+    }
     
     /**
      * Stop the daemon, ideally in an idempotent manner.
+     *
+     * Hook for JSVC
+     */
+    public void stop()
+    {
+        // this doesn't entirely shut down Cassandra, just the RPC server.
+        // jsvc takes care of taking the rest down
+        logger.info("Cassandra shutting down...");
+        stopRPCServer();
+    }
+
+    /**
+     * Start the underlying RPC server in idempotent manner.
+     */
+    public void startRPCServer()
+    {
+        if (!isRunning)
+        {
+            startServer();
+            isRunning = true;
+        }
+    }
+
+    /**
+     * Stop the underlying RPC server in idempotent manner.
+     */
+    public void stopRPCServer()
+    {
+        if (isRunning)
+        {
+            stopServer();
+            isRunning = false;
+        }
+    }
+
+    /**
+     * Returns whether the underlying RPC server is running or not.
+     */
+    public boolean isRPCServerRunning()
+    {
+        return isRunning;
+    }
+
+    /**
+     * Start the underlying RPC server.
+     * This method shoud be able to restart a server stopped through 
stopServer().
+     * Should throw a RuntimeException if the server cannot be started
      */
-    public abstract void stop();
+    protected abstract void startServer();
+
+    /**
+     * Stop the underlying RPC server.
+     * This method should be able to stop server started through startServer().
+     * Should throw a RuntimeException if the server cannot be stopped
+     */
+    protected abstract void stopServer();
+
     
     /**
      * Clean up all resources obtained during the lifetime of the daemon. This

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java 
Wed Jan 26 15:05:28 2011
@@ -60,6 +60,10 @@ public interface CassandraDaemon
      * to clarify, this is a hook for JSVC.
      */
     public void destroy();
+
+    public void startRPCServer();
+    public void stopRPCServer();
+    public boolean isRPCServerRunning();
     
     /**
      * A convenience method to initialize and start the daemon in one shot.

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/EmbeddedCassandraService.java
 Wed Jan 26 15:05:28 2011
@@ -37,35 +37,26 @@ import org.apache.thrift.transport.TTran
  * This is the implementation of 
https://issues.apache.org/jira/browse/CASSANDRA-740
  * <p>
  * How to use:
- * In the client code create a new thread and spawn it with its {@link 
Thread#start()} method.
+ * In the client code simply create a new EmbeddedCassandraService and start 
it.
  * Example:
  * <pre>
 
         cassandra = new EmbeddedCassandraService();
-        cassandra.init();
-
-        // spawn cassandra in a new thread
-        Thread t = new Thread(cassandra);
-        t.setDaemon(true);
-        t.start();
+        cassandra.start();
 
  * </pre>
  * @author Ran Tavory ([email protected])
  *
  */
-public class EmbeddedCassandraService implements Runnable
+public class EmbeddedCassandraService
 {
 
     CassandraDaemon cassandraDaemon;
 
-    public void init() throws TTransportException, IOException
+    public void start() throws IOException
     {
         cassandraDaemon = new CassandraDaemon();
         cassandraDaemon.init(null);
-    }
-
-    public void run()
-    {
         cassandraDaemon.start();
     }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java 
Wed Jan 26 15:05:28 2011
@@ -172,6 +172,8 @@ public class StorageService implements I
     private TokenMetadata tokenMetadata_ = new TokenMetadata();
 
     private Set<InetAddress> replicatingNodes = 
Collections.synchronizedSet(new HashSet<InetAddress>());
+    private CassandraDaemon daemon;
+
     private InetAddress removingNode;
 
     /* Are we starting this node in bootstrap mode? */
@@ -248,6 +250,11 @@ public class StorageService implements I
             throw new RuntimeException("Streaming service is unavailable.");
     }
 
+    public void registerDaemon(CassandraDaemon daemon)
+    {
+        this.daemon = daemon;
+    }
+
     // should only be called via JMX
     public void stopGossiping()
     {
@@ -270,6 +277,35 @@ public class StorageService implements I
         }
     }
 
+    // should only be called via JMX
+    public void startRPCServer()
+    {
+        if (daemon == null)
+        {
+            throw new IllegalStateException("No configured RPC daemon");
+        }
+        daemon.startRPCServer();
+    }
+
+    // should only be called via JMX
+    public void stopRPCServer()
+    {
+        if (daemon == null)
+        {
+            throw new IllegalStateException("No configured RPC daemon");
+        }
+        daemon.stopRPCServer();
+    }
+
+    public boolean isRPCServerRunning()
+    {
+        if (daemon == null)
+        {
+            throw new IllegalStateException("No configured RPC daemon");
+        }
+        return daemon.isRPCServerRunning();
+    }
+
     public void stopClient()
     {
         Gossiper.instance.unregister(migrationManager);

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
Wed Jan 26 15:05:28 2011
@@ -281,6 +281,15 @@ public interface StorageServiceMBean
     // to determine if gossip is disabled
     public boolean isInitialized();
 
+    // allows a user to disable thrift
+    public void stopRPCServer();
+
+    // allows a user to reenable thrift
+    public void startRPCServer();
+
+    // to determine if thrift is running
+    public boolean isRPCServerRunning();
+
     public void invalidateKeyCaches(String ks, String... cfs) throws 
IOException;
     public void invalidateRowCaches(String ks, String... cfs) throws 
IOException;
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
Wed Jan 26 15:05:28 2011
@@ -19,6 +19,7 @@
 package org.apache.cassandra.thrift;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.concurrent.ExecutorService;
 
@@ -47,89 +48,118 @@ import org.apache.thrift.transport.TTran
 public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassandraDaemon
 {
     private static Logger logger = 
LoggerFactory.getLogger(CassandraDaemon.class);
-    private TServer serverEngine;
+    private ThriftServer server;
 
-    protected void setup() throws IOException
+    protected void startServer()
     {
-        super.setup();
-
-        // now we start listening for clients
-        final CassandraServer cassandraServer = new CassandraServer();
-        Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
-
-        // Transport
-        TServerSocket tServerSocket = null;
-
-        try
+        if (server == null)
         {
-            tServerSocket = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
-                                                    
DatabaseDescriptor.getRpcKeepAlive(),
-                                                    
DatabaseDescriptor.getRpcSendBufferSize(),
-                                                    
DatabaseDescriptor.getRpcRecvBufferSize());
+            server = new ThriftServer(listenAddr, listenPort);
+            server.start();
         }
-        catch (TTransportException e)
-        {
-            throw new IOException(String.format("Unable to create thrift 
socket to %s:%s",
-                                                listenAddr, listenPort), e);
-        }
-        
-        logger.info(String.format("Binding thrift service to %s:%s", 
listenAddr, listenPort));
+    }
 
-        // Protocol factory
-        TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, 
-                                                                        true, 
-                                                                        
DatabaseDescriptor.getThriftMaxMessageLength());
-        
-        // Transport factory
-        TTransportFactory inTransportFactory, outTransportFactory;
-        if (DatabaseDescriptor.isThriftFramed())
-        {
-            int tFramedTransportSize = 
DatabaseDescriptor.getThriftFramedTransportSize();
-            inTransportFactory  = new TFastFramedTransport.Factory(64 * 1024, 
tFramedTransportSize);
-            outTransportFactory = new TFastFramedTransport.Factory(64 * 1024, 
tFramedTransportSize);
-            logger.info("Using TFastFramedTransport with a max frame size of 
{} bytes.", tFramedTransportSize);
-        }
-        else
+    protected void stopServer()
+    {
+        if (server != null)
         {
-            inTransportFactory = new TTransportFactory();
-            outTransportFactory = new TTransportFactory();
+            server.stopServer();
+            try
+            {
+                server.join();
+            }
+            catch (InterruptedException e)
+            {
+                logger.error("Interrupted while waiting thrift server to 
stop", e);
+            }
+            server = null;
         }
-
-        // ThreadPool Server
-        CustomTThreadPoolServer.Options options = new 
CustomTThreadPoolServer.Options();
-        options.minWorkerThreads = MIN_WORKER_THREADS;
-
-        ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState,
-                                                                 
options.minWorkerThreads,
-                                                                 
options.maxWorkerThreads);
-        serverEngine = new CustomTThreadPoolServer(new 
TProcessorFactory(processor),
-                                             tServerSocket,
-                                             inTransportFactory,
-                                             outTransportFactory,
-                                             tProtocolFactory,
-                                             tProtocolFactory,
-                                             options,
-                                             executorService);
     }
 
-    /** hook for JSVC */
-    public void start()
+    public static void main(String[] args)
     {
-        logger.info("Listening for thrift clients...");
-        serverEngine.serve();
+        new CassandraDaemon().activate();
     }
 
-    /** hook for JSVC */
-    public void stop()
+    /**
+     * Simple class to run the thrift connection accepting code in separate
+     * thread of control.
+     */
+    private static class ThriftServer extends Thread
     {
-        // this doesn't entirely shut down Cassandra, just the Thrift server.
-        // jsvc takes care of taking the rest down
-        logger.info("Cassandra shutting down...");
-        serverEngine.stop();
-    }
-    
-    public static void main(String[] args)
-    {
-        new CassandraDaemon().activate();
+        private TServer serverEngine;
+
+        public ThriftServer(InetAddress listenAddr, int listenPort)
+        {
+            // now we start listening for clients
+            final CassandraServer cassandraServer = new CassandraServer();
+            Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
+
+            // Transport
+            TServerSocket tServerSocket = null;
+
+            try
+            {
+                tServerSocket = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
+                        DatabaseDescriptor.getRpcKeepAlive(),
+                        DatabaseDescriptor.getRpcSendBufferSize(),
+                        DatabaseDescriptor.getRpcRecvBufferSize());
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s",
+                            listenAddr, listenPort), e);
+            }
+
+            logger.info(String.format("Binding thrift service to %s:%s", 
listenAddr, listenPort));
+
+            // Protocol factory
+            TProtocolFactory tProtocolFactory = new 
TBinaryProtocol.Factory(true,
+                    true,
+                    DatabaseDescriptor.getThriftMaxMessageLength());
+
+            // Transport factory
+            TTransportFactory inTransportFactory, outTransportFactory;
+            if (DatabaseDescriptor.isThriftFramed())
+            {
+                int tFramedTransportSize = 
DatabaseDescriptor.getThriftFramedTransportSize();
+                inTransportFactory  = new TFastFramedTransport.Factory(64 * 
1024, tFramedTransportSize);
+                outTransportFactory = new TFastFramedTransport.Factory(64 * 
1024, tFramedTransportSize);
+                logger.info("Using TFastFramedTransport with a max frame size 
of {} bytes.", tFramedTransportSize);
+            }
+            else
+            {
+                inTransportFactory = new TTransportFactory();
+                outTransportFactory = new TTransportFactory();
+            }
+
+            // ThreadPool Server
+            CustomTThreadPoolServer.Options options = new 
CustomTThreadPoolServer.Options();
+            options.minWorkerThreads = MIN_WORKER_THREADS;
+
+            ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState,
+                    options.minWorkerThreads,
+                    options.maxWorkerThreads);
+            serverEngine = new CustomTThreadPoolServer(new 
TProcessorFactory(processor),
+                    tServerSocket,
+                    inTransportFactory,
+                    outTransportFactory,
+                    tProtocolFactory,
+                    tProtocolFactory,
+                    options,
+                    executorService);
+        }
+
+        public void run()
+        {
+            logger.info("Listening for thrift clients...");
+            serverEngine.serve();
+        }
+
+        public void stopServer()
+        {
+            logger.info("Stop listening to thrift clients");
+            serverEngine.stop();
+        }
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Wed Jan 26 
15:05:28 2011
@@ -146,7 +146,7 @@ public class CliTest extends CleanupHelp
     @Test
     public void testCli() throws IOException, TTransportException, 
ConfigurationException
     {
-        setup();
+        new EmbeddedCassandraService().start();
 
         // new error/output streams for CliSessionState
         ByteArrayOutputStream errStream = new ByteArrayOutputStream();
@@ -204,24 +204,4 @@ public class CliTest extends CleanupHelp
             errStream.reset(); // no errors to the end user.
         }
     }
-
-    /**
-     * Setup embedded cassandra instance using test config.
-     * @throws TTransportException - when trying to bind address
-     * @throws IOException - when reading config file
-     * @throws ConfigurationException - when can set up configuration
-     */
-    private void setup() throws TTransportException, IOException, 
ConfigurationException
-    {
-        EmbeddedCassandraService cassandra;
-
-        cassandra = new EmbeddedCassandraService();
-        cassandra.init();
-
-        // spawn cassandra in a new thread
-        Thread t = new Thread(cassandra);
-        t.setDaemon(true);
-        t.start();
-    }
-
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java?rev=1063753&r1=1063752&r2=1063753&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/service/EmbeddedCassandraServiceTest.java
 Wed Jan 26 15:05:28 2011
@@ -64,14 +64,8 @@ public class EmbeddedCassandraServiceTes
     @BeforeClass
     public static void setup() throws TTransportException, IOException, 
InterruptedException, ConfigurationException
     {
-
         cassandra = new EmbeddedCassandraService();
-        cassandra.init();
-
-        // spawn cassandra in a new thread
-        Thread t = new Thread(cassandra);
-        t.setDaemon(true);
-        t.start();
+        cassandra.start();
     }
 
     @Test


Reply via email to