Author: jbellis
Date: Sun Jun  6 05:00:29 2010
New Revision: 951819

URL: http://svn.apache.org/viewvc?rev=951819&view=rev
Log:
avoid preserving login information after client disconnects.  patch by Kenny; 
reviewed by jbellis for CASSANDRA-1157

Added:
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=951819&r1=951818&r2=951819&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sun Jun  6 05:00:29 2010
@@ -10,6 +10,8 @@
  * don't reject reads at CL.ALL (CASSANDRA-1152)
  * reject deletions to supercolumns in CFs containing only standard
    columns (CASSANDRA-1139)
+ * avoid preserving login information after client disconnects
+   (CASSANDRA-1057)
 
 
 0.6.2

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=951819&r1=951818&r2=951819&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
 Sun Jun  6 05:00:29 2010
@@ -22,6 +22,10 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
@@ -29,7 +33,7 @@ import org.apache.log4j.PropertyConfigur
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
@@ -53,7 +57,7 @@ import org.apache.cassandra.db.Compactio
 public class CassandraDaemon
 {
     private static Logger logger = Logger.getLogger(CassandraDaemon.class);
-    private TThreadPoolServer serverEngine;
+    private TServer serverEngine;
 
     private void setup() throws IOException, TTransportException
     {
@@ -99,7 +103,7 @@ public class CassandraDaemon
         StorageService.instance.initServer();
         
         // now we start listening for clients
-        CassandraServer cassandraServer = new CassandraServer();
+        final CassandraServer cassandraServer = new CassandraServer();
         Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
 
         // Transport
@@ -124,16 +128,34 @@ public class CassandraDaemon
             outTransportFactory = new TTransportFactory();
         }
 
+
         // ThreadPool Server
-        TThreadPoolServer.Options options = new TThreadPoolServer.Options();
+        CustomTThreadPoolServer.Options options = new 
CustomTThreadPoolServer.Options();
         options.minWorkerThreads = 64;
-        serverEngine = new TThreadPoolServer(new TProcessorFactory(processor),
+
+        SynchronousQueue<Runnable> executorQueue = new 
SynchronousQueue<Runnable>();
+
+        ExecutorService executorService = new 
ThreadPoolExecutor(options.minWorkerThreads,
+                                                                 
options.maxWorkerThreads,
+                                                                 60,
+                                                                 
TimeUnit.SECONDS,
+                                                                 executorQueue)
+        {
+            @Override
+            protected void afterExecute(Runnable r, Throwable t)
+            {
+                super.afterExecute(r, t);
+                cassandraServer.logout();
+            }
+        };
+        serverEngine = new CustomTThreadPoolServer(new 
TProcessorFactory(processor),
                                              tServerSocket,
                                              inTransportFactory,
                                              outTransportFactory,
                                              tProtocolFactory,
                                              tProtocolFactory,
-                                             options);
+                                             options,
+                                             executorService);
     }
 
     /** hook for JSVC */

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=951819&r1=951818&r2=951819&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CassandraServer.java
 Sun Jun  6 05:00:29 2010
@@ -651,6 +651,14 @@ public class CassandraServer implements 
         loginDone.set(true);
     }
 
+    public void logout()
+    {
+        loginDone.remove();
+
+        if (logger.isDebugEnabled())
+            logger.debug("logout complete");
+    }
+
     protected void checkLoginDone() throws InvalidRequestException
     {
         // FIXME: This disables the "you must call login()" requirement when 
the configured

Added: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=951819&view=auto
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
 (added)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
 Sun Jun  6 05:00:29 2010
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.thrift;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Slightly modified version of the Apache Thrift TThreadPoolServer.
+ *
+ * This allows passing an executor so you have more control over the actual
+ * behaviour of the tasks being run.
+ *
+ * Newer version of Thrift should make this obsolete.
+ */
+public class CustomTThreadPoolServer extends TServer {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+
+// Executor service for handling client connections
+private ExecutorService executorService_;
+
+// Flag for stopping the server
+private volatile boolean stopped_;
+
+// Server options
+private Options options_;
+
+// Customizable server options
+public static class Options {
+       public int minWorkerThreads = 5;
+       public int maxWorkerThreads = Integer.MAX_VALUE;
+       public int stopTimeoutVal = 60;
+       public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+}
+
+
+public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+        TServerSocket tServerSocket,
+        TTransportFactory inTransportFactory,
+        TTransportFactory outTransportFactory,
+        TProtocolFactory tProtocolFactory,
+        TProtocolFactory tProtocolFactory2,
+        Options options,
+        ExecutorService executorService) {
+    
+    super(tProcessorFactory, tServerSocket, inTransportFactory, 
outTransportFactory,
+            tProtocolFactory, tProtocolFactory2);
+    options_ = options;
+    executorService_ = executorService;
+}
+
+
+public void serve() {
+       try {
+       serverTransport_.listen();
+       } catch (TTransportException ttx) {
+       LOGGER.error("Error occurred during listening.", ttx);
+       return;
+       }
+
+       stopped_ = false;
+       while (!stopped_) {
+       int failureCount = 0;
+       try {
+               TTransport client = serverTransport_.accept();
+               WorkerProcess wp = new WorkerProcess(client);
+               executorService_.execute(wp);
+       } catch (TTransportException ttx) {
+               if (!stopped_) {
+               ++failureCount;
+               LOGGER.warn("Transport error occurred during acceptance of 
message.", ttx);
+               }
+       }
+       }
+
+       executorService_.shutdown();
+
+       // Loop until awaitTermination finally does return without a interrupted
+       // exception. If we don't do this, then we'll shut down prematurely. We 
want
+       // to let the executorService clear it's task queue, closing client 
sockets
+       // appropriately.
+       long timeoutMS = 
options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+       long now = System.currentTimeMillis();
+       while (timeoutMS >= 0) {
+       try {
+               executorService_.awaitTermination(timeoutMS, 
TimeUnit.MILLISECONDS);
+               break;
+       } catch (InterruptedException ix) {
+               long newnow = System.currentTimeMillis();
+               timeoutMS -= (newnow - now);
+               now = newnow;
+       }
+       }
+}
+
+public void stop() {
+       stopped_ = true;
+       serverTransport_.interrupt();
+}
+
+private class WorkerProcess implements Runnable {
+
+       /**
+        * Client that this services.
+        */
+       private TTransport client_;
+
+       /**
+        * Default constructor.
+        *
+        * @param client Transport to process
+        */
+       private WorkerProcess(TTransport client) {
+       client_ = client;
+       }
+
+       /**
+        * Loops on processing a client forever
+        */
+       public void run() {
+       TProcessor processor = null;
+       TTransport inputTransport = null;
+       TTransport outputTransport = null;
+       TProtocol inputProtocol = null;
+       TProtocol outputProtocol = null;
+       try {
+               processor = processorFactory_.getProcessor(client_);
+               inputTransport = inputTransportFactory_.getTransport(client_);
+               outputTransport = outputTransportFactory_.getTransport(client_);
+               inputProtocol = 
inputProtocolFactory_.getProtocol(inputTransport);
+               outputProtocol = 
outputProtocolFactory_.getProtocol(outputTransport);
+               // we check stopped_ first to make sure we're not supposed to 
be shutting
+               // down. this is necessary for graceful shutdown.
+               while (!stopped_ && processor.process(inputProtocol, 
outputProtocol)) {}
+       } catch (TTransportException ttx) {
+               // Assume the client died and continue silently
+       } catch (TException tx) {
+               LOGGER.error("Thrift error occurred during processing of 
message.", tx);
+       } catch (Exception x) {
+               LOGGER.error("Error occurred during processing of message.", x);
+       }
+
+       if (inputTransport != null) {
+               inputTransport.close();
+       }
+
+       if (outputTransport != null) {
+               outputTransport.close();
+       }
+       }
+}
+}


Reply via email to