Author: jbellis
Date: Thu Feb 24 01:41:29 2011
New Revision: 1074010

URL: http://svn.apache.org/viewvc?rev=1074010&view=rev
Log:
reformat

Modified:
    
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java

Modified: 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1074010&r1=1074009&r2=1074010&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
 Thu Feb 24 01:41:29 2011
@@ -39,151 +39,180 @@ import org.apache.thrift.transport.TTran
 
 /**
  * Slightly modified version of the Apache Thrift TThreadPoolServer.
- *
+ * <p/>
  * This allows passing an executor so you have more control over the actual
  * behaviour of the tasks being run.
- *
+ * <p/>
  * Newer version of Thrift should make this obsolete.
  */
-public class CustomTThreadPoolServer extends TServer {
+public class CustomTThreadPoolServer extends TServer
+{
 
-private static final Logger LOGGER = 
LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
 
-// Executor service for handling client connections
-private ExecutorService executorService_;
+    // Executor service for handling client connections
+    private ExecutorService executorService_;
 
-// Flag for stopping the server
-private volatile boolean stopped_;
-
-// Server options
-private Options options_;
-
-// Customizable server options
-public static class Options {
-       public int minWorkerThreads = 5;
-       public int maxWorkerThreads = Integer.MAX_VALUE;
-       public int stopTimeoutVal = 60;
-       public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
-}
-
-
-public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
-        TServerSocket tServerSocket,
-        TTransportFactory inTransportFactory,
-        TTransportFactory outTransportFactory,
-        TProtocolFactory tProtocolFactory,
-        TProtocolFactory tProtocolFactory2,
-        Options options,
-        ExecutorService executorService) {
-    
-    super(tProcessorFactory, tServerSocket, inTransportFactory, 
outTransportFactory,
-            tProtocolFactory, tProtocolFactory2);
-    options_ = options;
-    executorService_ = executorService;
-}
-
-
-public void serve() {
-       try {
-       serverTransport_.listen();
-       } catch (TTransportException ttx) {
-       LOGGER.error("Error occurred during listening.", ttx);
-       return;
-       }
-
-       stopped_ = false;
-       while (!stopped_) {
-       int failureCount = 0;
-       try {
-               TTransport client = serverTransport_.accept();
-               WorkerProcess wp = new WorkerProcess(client);
-               executorService_.execute(wp);
-       } catch (TTransportException ttx) {
-               if (!stopped_) {
-               ++failureCount;
-               LOGGER.warn("Transport error occurred during acceptance of 
message.", ttx);
-               }
-       }
-       }
-
-       executorService_.shutdown();
-
-       // Loop until awaitTermination finally does return without a interrupted
-       // exception. If we don't do this, then we'll shut down prematurely. We 
want
-       // to let the executorService clear it's task queue, closing client 
sockets
-       // appropriately.
-       long timeoutMS = 
options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
-       long now = System.currentTimeMillis();
-       while (timeoutMS >= 0) {
-       try {
-               executorService_.awaitTermination(timeoutMS, 
TimeUnit.MILLISECONDS);
-               break;
-       } catch (InterruptedException ix) {
-               long newnow = System.currentTimeMillis();
-               timeoutMS -= (newnow - now);
-               now = newnow;
-       }
-       }
-}
-
-public void stop() {
-       stopped_ = true;
-       serverTransport_.interrupt();
-}
-
-private class WorkerProcess implements Runnable {
-
-       /**
-        * Client that this services.
-        */
-       private TTransport client_;
-
-       /**
-        * Default constructor.
-        *
-        * @param client Transport to process
-        */
-       private WorkerProcess(TTransport client) {
-       client_ = client;
-       }
-
-       /**
-        * Loops on processing a client forever
-        */
-       public void run() {
-       TProcessor processor = null;
-       TTransport inputTransport = null;
-       TTransport outputTransport = null;
-       TProtocol inputProtocol = null;
-       TProtocol outputProtocol = null;
-       try {
-               processor = processorFactory_.getProcessor(client_);
-               inputTransport = inputTransportFactory_.getTransport(client_);
-               outputTransport = outputTransportFactory_.getTransport(client_);
-               inputProtocol = 
inputProtocolFactory_.getProtocol(inputTransport);
-               outputProtocol = 
outputProtocolFactory_.getProtocol(outputTransport);
-               // we check stopped_ first to make sure we're not supposed to 
be shutting
-               // down. this is necessary for graceful shutdown.
-               while (!stopped_ && processor.process(inputProtocol, 
outputProtocol)) 
-               {
-                   inputProtocol = 
inputProtocolFactory_.getProtocol(inputTransport);
-                   outputProtocol = 
outputProtocolFactory_.getProtocol(outputTransport);
-               }
-       } catch (TTransportException ttx) {
-               // Assume the client died and continue silently
-       } catch (TException tx) {
-               LOGGER.error("Thrift error occurred during processing of 
message.", tx);
-       } catch (Exception x) {
-               LOGGER.error("Error occurred during processing of message.", x);
-       }
-
-       if (inputTransport != null) {
-               inputTransport.close();
-       }
-
-       if (outputTransport != null) {
-               outputTransport.close();
-       }
-       }
-}
+    // Flag for stopping the server
+    private volatile boolean stopped_;
+
+    // Server options
+    private Options options_;
+
+    // Customizable server options
+    public static class Options
+    {
+        public int minWorkerThreads = 5;
+        public int maxWorkerThreads = Integer.MAX_VALUE;
+        public int stopTimeoutVal = 60;
+        public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    }
+
+
+    public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+                                   TServerSocket tServerSocket,
+                                   TTransportFactory inTransportFactory,
+                                   TTransportFactory outTransportFactory,
+                                   TProtocolFactory tProtocolFactory,
+                                   TProtocolFactory tProtocolFactory2,
+                                   Options options,
+                                   ExecutorService executorService)
+    {
+
+        super(tProcessorFactory, tServerSocket, inTransportFactory, 
outTransportFactory,
+              tProtocolFactory, tProtocolFactory2);
+        options_ = options;
+        executorService_ = executorService;
+    }
+
+
+    public void serve()
+    {
+        try
+        {
+            serverTransport_.listen();
+        }
+        catch (TTransportException ttx)
+        {
+            LOGGER.error("Error occurred during listening.", ttx);
+            return;
+        }
+
+        stopped_ = false;
+        while (!stopped_)
+        {
+            int failureCount = 0;
+            try
+            {
+                TTransport client = serverTransport_.accept();
+                WorkerProcess wp = new WorkerProcess(client);
+                executorService_.execute(wp);
+            }
+            catch (TTransportException ttx)
+            {
+                if (!stopped_)
+                {
+                    ++failureCount;
+                    LOGGER.warn("Transport error occurred during acceptance of 
message.", ttx);
+                }
+            }
+        }
+
+        executorService_.shutdown();
+
+        // Loop until awaitTermination finally does return without a 
interrupted
+        // exception. If we don't do this, then we'll shut down prematurely. 
We want
+        // to let the executorService clear it's task queue, closing client 
sockets
+        // appropriately.
+        long timeoutMS = 
options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+        long now = System.currentTimeMillis();
+        while (timeoutMS >= 0)
+        {
+            try
+            {
+                executorService_.awaitTermination(timeoutMS, 
TimeUnit.MILLISECONDS);
+                break;
+            }
+            catch (InterruptedException ix)
+            {
+                long newnow = System.currentTimeMillis();
+                timeoutMS -= (newnow - now);
+                now = newnow;
+            }
+        }
+    }
+
+    public void stop()
+    {
+        stopped_ = true;
+        serverTransport_.interrupt();
+    }
+
+    private class WorkerProcess implements Runnable
+    {
+
+        /**
+         * Client that this services.
+         */
+        private TTransport client_;
+
+        /**
+         * Default constructor.
+         *
+         * @param client Transport to process
+         */
+        private WorkerProcess(TTransport client)
+        {
+            client_ = client;
+        }
+
+        /**
+         * Loops on processing a client forever
+         */
+        public void run()
+        {
+            TProcessor processor = null;
+            TTransport inputTransport = null;
+            TTransport outputTransport = null;
+            TProtocol inputProtocol = null;
+            TProtocol outputProtocol = null;
+            try
+            {
+                processor = processorFactory_.getProcessor(client_);
+                inputTransport = inputTransportFactory_.getTransport(client_);
+                outputTransport = 
outputTransportFactory_.getTransport(client_);
+                inputProtocol = 
inputProtocolFactory_.getProtocol(inputTransport);
+                outputProtocol = 
outputProtocolFactory_.getProtocol(outputTransport);
+                // we check stopped_ first to make sure we're not supposed to 
be shutting
+                // down. this is necessary for graceful shutdown.
+                while (!stopped_ && processor.process(inputProtocol, 
outputProtocol))
+                {
+                    inputProtocol = 
inputProtocolFactory_.getProtocol(inputTransport);
+                    outputProtocol = 
outputProtocolFactory_.getProtocol(outputTransport);
+                }
+            }
+            catch (TTransportException ttx)
+            {
+                // Assume the client died and continue silently
+            }
+            catch (TException tx)
+            {
+                LOGGER.error("Thrift error occurred during processing of 
message.", tx);
+            }
+            catch (Exception x)
+            {
+                LOGGER.error("Error occurred during processing of message.", 
x);
+            }
+
+            if (inputTransport != null)
+            {
+                inputTransport.close();
+            }
+
+            if (outputTransport != null)
+            {
+                outputTransport.close();
+            }
+        }
+    }
 }


Reply via email to