Propchange: cassandra/trunk/src/gen-java/org/apache/cassandra/cql/CqlParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1152233&r1=1152232&r2=1152233&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
 Fri Jul 29 14:21:06 2011
@@ -56,13 +56,24 @@ public class JMXEnabledThreadPoolExecuto
     }
 
     public JMXEnabledThreadPoolExecutor(int corePoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            NamedThreadFactory threadFactory,
+            String jmxPath)
+    {
+        this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, 
threadFactory, jmxPath);
+    }
+    
+    public JMXEnabledThreadPoolExecutor(int corePoolSize,
+                                        int maxPoolSize,
                                         long keepAliveTime,
                                         TimeUnit unit,
                                         BlockingQueue<Runnable> workQueue,
                                         NamedThreadFactory threadFactory,
                                         String jmxPath)
     {
-        super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory);
+        super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
         super.prestartAllCoreThreads();
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1152233&r1=1152232&r2=1152233&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Fri Jul 29 
14:21:06 2011
@@ -67,6 +67,7 @@ public class Config
     
     public String rpc_address;
     public Integer rpc_port = 9160;
+    public String rpc_server_type = "sync";
     public Boolean rpc_keepalive = true;
     public Integer rpc_min_threads = 16;
     public Integer rpc_max_threads = Integer.MAX_VALUE;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1152233&r1=1152232&r2=1152233&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
Fri Jul 29 14:21:06 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.util.Mmap
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.scheduler.NoScheduler;
+import org.apache.cassandra.thrift.CassandraDaemon;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.yaml.snakeyaml.Loader;
@@ -369,6 +370,9 @@ public class DatabaseDescriptor
             if (conf.compaction_throughput_mb_per_sec == null)
                 conf.compaction_throughput_mb_per_sec = 16;
 
+            if 
(!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
+                throw new ConfigurationException("Unknown rpc_server_type: " + 
conf.rpc_server_type);
+            
             /* data file and commit log directories. they get created later, 
when they're needed. */
             if (conf.commitlog_directory != null && conf.data_file_directories 
!= null && conf.saved_caches_directory != null)
             {
@@ -899,6 +903,11 @@ public class DatabaseDescriptor
     {
         return rpcAddress;
     }
+    
+    public static String getRpcServerType()
+    {
+        return conf.rpc_server_type;
+    }
 
     public static boolean getRpcKeepAlive()
     {

Added: 
cassandra/trunk/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/SocketSessionManagementService.java?rev=1152233&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
 Fri Jul 29 14:21:06 2011
@@ -0,0 +1,47 @@
+package org.apache.cassandra.service;
+
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SocketSessionManagementService
+{
+    public final static SocketSessionManagementService instance = new 
SocketSessionManagementService();
+    public final static ThreadLocal<SocketAddress> remoteSocket = new 
ThreadLocal<SocketAddress>();
+    private Map<SocketAddress, ClientState> activeSocketSessions = new 
ConcurrentHashMap<SocketAddress, ClientState>();
+
+    public ClientState get(SocketAddress key)
+    {
+        ClientState retval = null;
+        if (null != key)
+        {
+            retval = activeSocketSessions.get(key);
+        }
+        return retval;
+    }
+
+    public void put(SocketAddress key, ClientState value)
+    {
+        if (null != key && null != value)
+        {
+            activeSocketSessions.put(key, value);
+        }
+    }
+
+    public boolean remove(SocketAddress key)
+    {
+        assert null != key;
+        boolean retval = false;
+        if (null != activeSocketSessions.remove(key))
+        {
+            retval = true;
+        }
+        return retval;
+    }
+
+    public void clear()
+    {
+        activeSocketSessions.clear();
+    }
+
+}

Propchange: 
cassandra/trunk/src/java/org/apache/cassandra/service/SocketSessionManagementService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1152233&r1=1152232&r2=1152233&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
Fri Jul 29 14:21:06 2011
@@ -20,18 +20,25 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
@@ -47,6 +54,10 @@ import org.apache.thrift.transport.TTran
 public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassandraDaemon
 {
     private static Logger logger = 
LoggerFactory.getLogger(CassandraDaemon.class);
+    private final static String SYNC = "sync";
+    private final static String ASYNC = "async";
+    private final static String HSHA = "hsha";
+    public final static List<String> rpc_server_types = Arrays.asList(SYNC, 
ASYNC, HSHA);
     private ThriftServer server;
 
     protected void startServer()
@@ -95,49 +106,90 @@ public class CassandraDaemon extends org
             Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
 
             // Transport
-            TServerSocket tServerSocket = null;
-
-            try
-            {
-                tServerSocket = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
-                        DatabaseDescriptor.getRpcKeepAlive(),
-                        DatabaseDescriptor.getRpcSendBufferSize(),
-                        DatabaseDescriptor.getRpcRecvBufferSize());
-            }
-            catch (TTransportException e)
-            {
-                throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s",
-                            listenAddr, listenPort), e);
-            }
-
             logger.info(String.format("Binding thrift service to %s:%s", 
listenAddr, listenPort));
 
             // Protocol factory
-            TProtocolFactory tProtocolFactory = new 
TBinaryProtocol.Factory(true,
-                    true,
-                    DatabaseDescriptor.getThriftMaxMessageLength());
+            TProtocolFactory tProtocolFactory = new 
TBinaryProtocol.Factory(true, true, 
DatabaseDescriptor.getThriftMaxMessageLength());
 
             // Transport factory
-            TTransportFactory inTransportFactory, outTransportFactory;
             int tFramedTransportSize = 
DatabaseDescriptor.getThriftFramedTransportSize();
-            inTransportFactory  = new 
TFramedTransport.Factory(tFramedTransportSize);
-            outTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory inTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            TTransportFactory outTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
             logger.info("Using TFastFramedTransport with a max frame size of 
{} bytes.", tFramedTransportSize);
-
-            // ThreadPool Server
-            TThreadPoolServer.Args args = new 
TThreadPoolServer.Args(tServerSocket)
-                                          
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                          
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                          
.inputTransportFactory(inTransportFactory)
-                                          
.outputTransportFactory(outTransportFactory)
-                                          
.inputProtocolFactory(tProtocolFactory)
-                                          
.outputProtocolFactory(tProtocolFactory)
-                                          .processor(processor);
-
-            ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState,
-                    args.minWorkerThreads,
-                    args.maxWorkerThreads);
-            serverEngine = new CustomTThreadPoolServer(args, executorService);
+            
+            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
+            {                
+                TServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort), 
+                                                              
DatabaseDescriptor.getRpcKeepAlive(), 
+                                                              
DatabaseDescriptor.getRpcSendBufferSize(),
+                                                              
DatabaseDescriptor.getRpcRecvBufferSize());
+                } 
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+                // ThreadPool Server and will be invocation per connection 
basis...
+                TThreadPoolServer.Args serverArgs = new 
TThreadPoolServer.Args(serverTransport)
+                                                                         
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                         
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                         
.inputTransportFactory(inTransportFactory)
+                                                                         
.outputTransportFactory(outTransportFactory)
+                                                                         
.inputProtocolFactory(tProtocolFactory)
+                                                                         
.outputProtocolFactory(tProtocolFactory)
+                                                                         
.processor(processor);
+                ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, 
serverArgs.maxWorkerThreads);
+                serverEngine = new CustomTThreadPoolServer(serverArgs, 
executorService);
+                logger.info(String.format("Using synchronous/threadpool thrift 
server on %s : %s", listenAddr, listenPort));
+            }
+            else
+            {
+                TNonblockingServerTransport serverTransport;
+                try
+                {
+                    serverTransport = new TCustomNonblockingServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
+                                                                             
DatabaseDescriptor.getRpcKeepAlive(), 
+                                                                             
DatabaseDescriptor.getRpcSendBufferSize(),
+                                                                             
DatabaseDescriptor.getRpcRecvBufferSize());
+                } 
+                catch (TTransportException e)
+                {
+                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
+                }
+
+                if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
+                {
+                    // This is single threaded hence the invocation will be all
+                    // in one thread.
+                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                               
                      .outputTransportFactory(outTransportFactory)
+                                                                               
                      .inputProtocolFactory(tProtocolFactory)
+                                                                               
                      .outputProtocolFactory(tProtocolFactory)
+                                                                               
                      .processor(processor);
+                    logger.info(String.format("Using non-blocking/asynchronous 
thrift server on %s : %s", listenAddr, listenPort));
+                    serverEngine = new CustomTNonBlockingServer(serverArgs);
+                } 
+                else if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
+                {
+                    // This is NIO selector service but the invocation will be 
Multi-Threaded with the Executor service.
+                    ExecutorService executorService = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                               
        DatabaseDescriptor.getRpcMaxThreads(),
+                                                                               
        DatabaseDescriptor.getRpcTimeout(), 
+                                                                               
        TimeUnit.MILLISECONDS,
+                                                                               
        new LinkedBlockingQueue<Runnable>(), 
+                                                                               
        new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
+                                                                               
        .outputTransportFactory(outTransportFactory)
+                                                                               
        .inputProtocolFactory(tProtocolFactory)
+                                                                               
        .outputProtocolFactory(tProtocolFactory)
+                                                                               
        .processor(processor);
+                    logger.info(String.format("Using custom 
half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
+                    // Check for available processors in the system which will 
be equal to the IO Threads.
+                    serverEngine = new CustomTHsHaServer(serverArgs, 
executorService, Runtime.getRuntime().availableProcessors());
+                }
+            }
         }
 
         public void run()

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1152233&r1=1152232&r2=1152233&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Fri Jul 29 14:21:06 2011
@@ -21,6 +21,7 @@ package org.apache.cassandra.thrift;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
@@ -51,6 +52,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.scheduler.IRequestScheduler;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -86,7 +88,22 @@ public class CassandraServer implements 
     
     public ClientState state()
     {
-        return clientState.get();
+        SocketAddress remoteSocket = 
SocketSessionManagementService.remoteSocket.get();
+        ClientState retval = null;
+        if (null != remoteSocket)
+        {
+            retval = SocketSessionManagementService.instance.get(remoteSocket);
+            if (null == retval)
+            {
+                retval = new ClientState();
+                SocketSessionManagementService.instance.put(remoteSocket, 
retval);
+            }
+        } 
+        else
+        {
+            retval = clientState.get();
+        }
+        return retval;
     }
 
     protected Map<DecoratedKey, ColumnFamily> 
readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level)

Added: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java?rev=1152233&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java 
(added)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java 
Fri Jul 29 14:21:06 2011
@@ -0,0 +1,305 @@
+package org.apache.cassandra.thrift;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a interim solution till THRIFT-1167 gets committed...
+ * 
+ * The idea here is to avoid sticking to one CPU for IO's. For better 
throughput
+ * it is spread across multiple threads. Number of selector thread can be the
+ * number of CPU available.
+ */
+public class CustomTHsHaServer extends TNonblockingServer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CustomTHsHaServer.class.getName());
+    private Set<SelectorThread> ioThreads = new HashSet<SelectorThread>();
+    private volatile boolean stopped_ = true;
+    private ExecutorService invoker;
+
+    /**
+     * All the arguments to Non Blocking Server will apply here. In addition,
+     * executor pool will be responsible for creating the internal threads 
which
+     * will process the data. threads for selection usually are equal to the
+     * number of cpu's
+     */
+    public CustomTHsHaServer(Args args, ExecutorService invoker, int 
threadCount)
+    {
+        super(args);
+        this.invoker = invoker;
+        // Create all the Network IO Threads.
+        for (int i = 0; i < threadCount; ++i)
+            ioThreads.add(new SelectorThread("Selector-Thread-" + i));
+    }
+
+    /** @inheritDoc */
+    @Override
+    public void serve()
+    {
+        if (!startListening())
+            return;
+        if (!startThreads())
+            return;
+        setServing(true);
+        joinSelector();
+        invoker.shutdown();
+        setServing(false);
+        stopListening();
+    }
+
+    /**
+     * Save the remote socket as a thead local for future use of client state.
+     */
+    protected class Invocation implements Runnable
+    {
+        private final FrameBuffer frameBuffer;
+        private SelectorThread thread;
+
+        public Invocation(final FrameBuffer frameBuffer, SelectorThread thread)
+        {
+            this.frameBuffer = frameBuffer;
+            this.thread = thread;
+        }
+
+        public void run()
+        {
+            TNonblockingSocket socket = (TNonblockingSocket) 
frameBuffer.trans_;
+            
SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+            frameBuffer.invoke();
+            // this is how we let the same selector thread change the 
selection type.
+            thread.requestSelectInterestChange(frameBuffer);
+        }
+    }
+
+    protected boolean startThreads()
+    {
+        stopped_ = false;
+        // start all the threads.
+        for (SelectorThread thread : ioThreads)
+            thread.start();
+        return true;
+    }
+
+    @Override
+    protected void joinSelector()
+    {
+        try
+        {
+            // wait till all done with stuff's
+            for (SelectorThread thread : ioThreads)
+                thread.join();
+        } 
+        catch (InterruptedException e)
+        {
+            LOGGER.error("Interrupted while joining threads!", e);
+        }
+    }
+
+    /**
+     * Stop serving and shut everything down.
+     */
+    @Override
+    public void stop()
+    {
+        stopListening();
+        stopped_ = true;
+        for (SelectorThread thread : ioThreads)
+            thread.wakeupSelector();
+        joinSelector();
+    }
+
+    /**
+     * IO Threads will perform expensive IO operations...
+     */
+    protected class SelectorThread extends Thread
+    {
+        private final Selector selector;
+        private TNonblockingServerTransport serverTransport;
+        private Set<FrameBuffer> selectInterestChanges = new 
HashSet<FrameBuffer>();
+
+        public SelectorThread(String name)
+        {
+            super(name);
+            try
+            {
+                this.selector = SelectorProvider.provider().openSelector();
+                this.serverTransport = (TNonblockingServerTransport) 
serverTransport_;
+                this.serverTransport.registerSelector(selector);
+            } 
+            catch (IOException ex)
+            {
+                throw new RuntimeException("Couldnt open the NIO selector", 
ex);
+            }
+        }
+
+        public void run()
+        {
+            try
+            {
+                while (!stopped_)
+                {
+                    select();
+                }
+            } 
+            catch (Throwable t)
+            {
+                LOGGER.error("Uncaught Exception: ", t);
+            }
+        }
+
+        private void select() throws InterruptedException, IOException
+        {
+            // wait for new keys
+            selector.select();
+            Iterator<SelectionKey> keyIterator = 
selector.selectedKeys().iterator();
+            while (keyIterator.hasNext())
+            {
+                SelectionKey key = keyIterator.next();
+                keyIterator.remove();
+                if (!key.isValid())
+                {
+                    // if invalid cleanup.
+                    cleanupSelectionkey(key);
+                    continue;
+                }
+
+                if (key.isAcceptable())
+                    handleAccept();
+                if (key.isReadable())
+                    handleRead(key);
+                else if (key.isWritable())
+                    handleWrite(key);
+                else
+                    LOGGER.debug("Unexpected state " + key.interestOps());
+            }
+            // process the changes which are inserted after completion.
+            processInterestChanges();
+        }
+        
+        private void handleAccept()
+        {
+            SelectionKey clientKey = null;
+            TNonblockingTransport client = null;
+            try
+            {
+                // accept the connection
+                client = (TNonblockingTransport) serverTransport.accept();
+                clientKey = client.registerSelector(selector, 
SelectionKey.OP_READ);
+                // add this key to the map
+                FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+                clientKey.attach(frameBuffer); 
+            } catch (TTransportException ex)
+            {
+                // ignore this might have been handled by the other threads.
+                // serverTransport.accept() as it returns null as nothing to 
accept.
+                return;
+            }
+            catch (IOException tte)
+            {
+                // something went wrong accepting.
+                LOGGER.warn("Exception trying to accept!", tte);
+                tte.printStackTrace();
+                if (clientKey != null)
+                    cleanupSelectionkey(clientKey);
+                if (client != null)
+                    client.close();
+            }
+        }
+        
+        private void handleRead(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (!buffer.read())
+            {
+                cleanupSelectionkey(key);
+                return;
+            }
+
+            if (buffer.isFrameFullyRead())
+            {
+                if (!requestInvoke(buffer, this))
+                    cleanupSelectionkey(key);
+            }
+        }
+        
+        private void handleWrite(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (!buffer.write())
+                cleanupSelectionkey(key);
+        }
+        
+        public void requestSelectInterestChange(FrameBuffer frameBuffer)
+        {
+            synchronized (selectInterestChanges)
+            {
+                selectInterestChanges.add(frameBuffer);
+            }
+            // Wake-up the selector, if it's currently blocked.
+            selector.wakeup();
+        }
+
+        private void processInterestChanges()
+        {
+            synchronized (selectInterestChanges)
+            {
+                for (FrameBuffer fb : selectInterestChanges)
+                    fb.changeSelectInterests();
+                selectInterestChanges.clear();
+            }
+        }
+        
+        private void cleanupSelectionkey(SelectionKey key)
+        {
+            FrameBuffer buffer = (FrameBuffer) key.attachment();
+            if (buffer != null)
+                buffer.close();
+            // cancel the selection key
+            key.cancel();
+        }
+        
+        public void wakeupSelector()
+        {
+            selector.wakeup();
+        }
+    }
+    
+    protected boolean requestInvoke(FrameBuffer frameBuffer, SelectorThread 
thread)
+    {
+        try
+        {
+            Runnable invocation = new Invocation(frameBuffer, thread);
+            invoker.execute(invocation);
+            return true;
+        } 
+        catch (RejectedExecutionException rx)
+        {
+            LOGGER.warn("ExecutorService rejected execution!", rx);
+            return false;
+        }
+    }
+
+    @Override
+    protected void requestSelectInterestChange(FrameBuffer fb)
+    {
+        // Dont change the interest here, this has to be done by the selector
+        // thread because the method is not synchronized with the rest of the
+        // selectors threads.
+    }
+}

Propchange: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java?rev=1152233&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
 Fri Jul 29 14:21:06 2011
@@ -0,0 +1,22 @@
+package org.apache.cassandra.thrift;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+
+public class CustomTNonBlockingServer extends TNonblockingServer
+{
+    public CustomTNonBlockingServer(Args args)
+    {
+        super(args);
+    }
+
+    @Override
+    protected boolean requestInvoke(FrameBuffer frameBuffer)
+    {
+        TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_;
+        
SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress());
+        frameBuffer.invoke();
+        return true;
+    }
+}

Propchange: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java?rev=1152233&view=auto
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
 (added)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
 Fri Jul 29 14:21:06 2011
@@ -0,0 +1,71 @@
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.cassandra.service.SocketSessionManagementService;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TCustomNonblockingServerSocket extends TNonblockingServerSocket
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TCustomNonblockingServerSocket.class);
+    private final boolean keepAlive;
+    private final Integer sendBufferSize;
+    private final Integer recvBufferSize;
+
+    public TCustomNonblockingServerSocket(InetSocketAddress bindAddr, boolean 
keepAlive, Integer sendBufferSize, Integer recvBufferSize) throws 
TTransportException
+    {
+        super(bindAddr);
+        this.keepAlive = keepAlive;
+        this.sendBufferSize = sendBufferSize;
+        this.recvBufferSize = recvBufferSize;
+    }
+
+    @Override
+    protected TNonblockingSocket acceptImpl() throws TTransportException
+    {
+        TNonblockingSocket tsocket = super.acceptImpl();
+        if (tsocket == null || tsocket.getSocketChannel() == null)
+            return tsocket;
+        Socket socket = tsocket.getSocketChannel().socket();
+        // clean up the old information.
+        
SocketSessionManagementService.instance.remove(socket.getRemoteSocketAddress());
+        try
+        {
+            socket.setKeepAlive(this.keepAlive);
+        } catch (SocketException se)
+        {
+            logger.warn("Failed to set keep-alive on Thrift socket.", se);
+        }
+        
+        if (this.sendBufferSize != null)
+        {
+            try
+            {
+                socket.setSendBufferSize(this.sendBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set send buffer size on Thrift 
socket.", se);
+            }
+        }
+
+        if (this.recvBufferSize != null)
+        {
+            try
+            {
+                socket.setReceiveBufferSize(this.recvBufferSize.intValue());
+            }
+            catch (SocketException se)
+            {
+                logger.warn("Failed to set receive buffer size on Thrift 
socket.", se);
+            }
+        }
+        return tsocket;
+    }
+}
\ No newline at end of file

Propchange: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to