Author: brandonwilliams Date: Fri Jul 29 14:29:49 2011 New Revision: 1152238
URL: http://svn.apache.org/viewvc?rev=1152238&view=rev Log: Add asynchronous and half-sync/half-async thrift servers. Patch by Vijay Parthasarathy, reviewed by brandonwilliams for CASSANDRA-1405 Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java (with props) cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java (with props) cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java (with props) cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java (with props) Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml cassandra/branches/cassandra-0.8/conf/log4j-server.properties cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/branches/cassandra-0.8/conf/cassandra.yaml URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/cassandra.yaml?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/conf/cassandra.yaml (original) +++ cassandra/branches/cassandra-0.8/conf/cassandra.yaml Fri Jul 29 14:29:49 2011 @@ -196,16 +196,35 @@ rpc_port: 9160 # enable or disable keepalive on rpc connections rpc_keepalive: true -# Cassandra uses thread-per-client for client RPC. This can -# be expensive in memory used for thread stack for a large -# enough number of clients. (Hence, connection pooling is -# very, very strongly recommended.) -# +# Cassandra provides you with a variety of options for RPC Server +# sync -> Creates one thread per connection but with a configurable number of +# threads. This can be expensive in memory used for thread stack for +# a large enough number of clients. (Hence, connection pooling is +# very, very strongly recommended.) +# +# async -> Nonblocking server implementation with one thread to serve +# rpc connections. This is not recommended for high throughput use +# cases. +# +# hsha -> half sync and half async implementation with configurable number +# of worker threads (For managing connections). IO Management is +# done by a set of threads currently equal to the number of +# processors in the system. The number of threads in the threadpool +# is configured via rpc_min_threads and rpc_max_threads. (Connection +# pooling is strongly recommended in this case too.) + +rpc_server_type: sync + # Uncomment rpc_min|max|thread to set request pool size. -# You would primarily set max as a safeguard against misbehaved -# clients; if you do hit the max, Cassandra will block until -# one disconnects before accepting more. The defaults are -# min of 16 and max unlimited. +# You would primarily set max for the sync server to safeguard against +# misbehaved clients; if you do hit the max, Cassandra will block until one +# disconnects before accepting more. The defaults are min of 16 and max +# unlimited. +# +# For the Hsha server, you would set the max so that a fair amount of resources +# are provided to the other working threads on the server. +# +# This configuration is not used for the async server. # # rpc_min_threads: 16 # rpc_max_threads: 2048 Modified: cassandra/branches/cassandra-0.8/conf/log4j-server.properties URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/conf/log4j-server.properties?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/conf/log4j-server.properties (original) +++ cassandra/branches/cassandra-0.8/conf/log4j-server.properties Fri Jul 29 14:29:49 2011 @@ -39,3 +39,6 @@ log4j.appender.R.File=/var/log/cassandra #log4j.logger.org.apache.cassandra.db=DEBUG #log4j.logger.org.apache.cassandra.service.StorageProxy=DEBUG +# Adding this to avoid thrift logging disconnect errors. +log4j.logger.org.apache.thrift.server.TNonblockingServer=ERROR + Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java Fri Jul 29 14:29:49 2011 @@ -56,13 +56,24 @@ public class JMXEnabledThreadPoolExecuto } public JMXEnabledThreadPoolExecutor(int corePoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue<Runnable> workQueue, + NamedThreadFactory threadFactory, + String jmxPath) + { + this(corePoolSize, corePoolSize, keepAliveTime, unit, workQueue, threadFactory, jmxPath); + } + + public JMXEnabledThreadPoolExecutor(int corePoolSize, + int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, NamedThreadFactory threadFactory, String jmxPath) { - super(corePoolSize, keepAliveTime, unit, workQueue, threadFactory); + super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory); super.prestartAllCoreThreads(); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/Config.java Fri Jul 29 14:29:49 2011 @@ -66,6 +66,7 @@ public class Config public String rpc_address; public Integer rpc_port = 9160; + public String rpc_server_type = "sync"; public Boolean rpc_keepalive = true; public Integer rpc_min_threads = 16; public Integer rpc_max_threads = Integer.MAX_VALUE; Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Fri Jul 29 14:29:49 2011 @@ -48,6 +48,7 @@ import org.apache.cassandra.io.util.File import org.apache.cassandra.locator.*; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; +import org.apache.cassandra.thrift.CassandraDaemon; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.yaml.snakeyaml.Loader; @@ -352,6 +353,9 @@ public class DatabaseDescriptor if (conf.compaction_throughput_mb_per_sec == null) conf.compaction_throughput_mb_per_sec = 16; + if (!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase())) + throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type); + /* data file and commit log directories. they get created later, when they're needed. */ if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null) { @@ -875,6 +879,11 @@ public class DatabaseDescriptor { return rpcAddress; } + + public static String getRpcServerType() + { + return conf.rpc_server_type; + } public static boolean getRpcKeepAlive() { Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java?rev=1152238&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java Fri Jul 29 14:29:49 2011 @@ -0,0 +1,47 @@ +package org.apache.cassandra.service; + +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SocketSessionManagementService +{ + public final static SocketSessionManagementService instance = new SocketSessionManagementService(); + public final static ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<SocketAddress>(); + private Map<SocketAddress, ClientState> activeSocketSessions = new ConcurrentHashMap<SocketAddress, ClientState>(); + + public ClientState get(SocketAddress key) + { + ClientState retval = null; + if (null != key) + { + retval = activeSocketSessions.get(key); + } + return retval; + } + + public void put(SocketAddress key, ClientState value) + { + if (null != key && null != value) + { + activeSocketSessions.put(key, value); + } + } + + public boolean remove(SocketAddress key) + { + assert null != key; + boolean retval = false; + if (null != activeSocketSessions.remove(key)) + { + retval = true; + } + return retval; + } + + public void clear() + { + activeSocketSessions.clear(); + } + +} Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/SocketSessionManagementService.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Fri Jul 29 14:29:49 2011 @@ -20,18 +20,25 @@ package org.apache.cassandra.thrift; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TThreadPoolServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.TServer; import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; @@ -47,6 +54,10 @@ import org.apache.thrift.transport.TTran public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon { private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class); + private final static String SYNC = "sync"; + private final static String ASYNC = "async"; + private final static String HSHA = "hsha"; + public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA); private ThriftServer server; protected void startServer() @@ -95,49 +106,90 @@ public class CassandraDaemon extends org Cassandra.Processor processor = new Cassandra.Processor(cassandraServer); // Transport - TServerSocket tServerSocket = null; - - try - { - tServerSocket = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), - DatabaseDescriptor.getRpcKeepAlive(), - DatabaseDescriptor.getRpcSendBufferSize(), - DatabaseDescriptor.getRpcRecvBufferSize()); - } - catch (TTransportException e) - { - throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", - listenAddr, listenPort), e); - } - logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort)); // Protocol factory - TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, - true, - DatabaseDescriptor.getThriftMaxMessageLength()); + TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength()); // Transport factory - TTransportFactory inTransportFactory, outTransportFactory; int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize(); - inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); - outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); + TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); + TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); logger.info("Using TFastFramedTransport with a max frame size of {} bytes.", tFramedTransportSize); - - // ThreadPool Server - TThreadPoolServer.Args args = new TThreadPoolServer.Args(tServerSocket) - .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads()) - .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads()) - .inputTransportFactory(inTransportFactory) - .outputTransportFactory(outTransportFactory) - .inputProtocolFactory(tProtocolFactory) - .outputProtocolFactory(tProtocolFactory) - .processor(processor); - - ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, - args.minWorkerThreads, - args.maxWorkerThreads); - serverEngine = new CustomTThreadPoolServer(args, executorService); + + if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC)) + { + TServerTransport serverTransport; + try + { + serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), + DatabaseDescriptor.getRpcKeepAlive(), + DatabaseDescriptor.getRpcSendBufferSize(), + DatabaseDescriptor.getRpcRecvBufferSize()); + } + catch (TTransportException e) + { + throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); + } + // ThreadPool Server and will be invocation per connection basis... + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport) + .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads()) + .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads()) + .inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads); + serverEngine = new CustomTThreadPoolServer(serverArgs, executorService); + logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort)); + } + else + { + TNonblockingServerTransport serverTransport; + try + { + serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort), + DatabaseDescriptor.getRpcKeepAlive(), + DatabaseDescriptor.getRpcSendBufferSize(), + DatabaseDescriptor.getRpcRecvBufferSize()); + } + catch (TTransportException e) + { + throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); + } + + if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC)) + { + // This is single threaded hence the invocation will be all + // in one thread. + TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort)); + serverEngine = new CustomTNonBlockingServer(serverArgs); + } + else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA)) + { + // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service. + ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(), + DatabaseDescriptor.getRpcMaxThreads(), + DatabaseDescriptor.getRpcTimeout(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL"); + TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) + .outputTransportFactory(outTransportFactory) + .inputProtocolFactory(tProtocolFactory) + .outputProtocolFactory(tProtocolFactory) + .processor(processor); + logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort)); + // Check for available processors in the system which will be equal to the IO Threads. + serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors()); + } + } } public void run() Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1152238&r1=1152237&r2=1152238&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Jul 29 14:29:49 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra.thrift; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; @@ -51,6 +52,7 @@ import org.apache.cassandra.dht.*; import org.apache.cassandra.locator.*; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.SocketSessionManagementService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -86,7 +88,22 @@ public class CassandraServer implements public ClientState state() { - return clientState.get(); + SocketAddress remoteSocket = SocketSessionManagementService.remoteSocket.get(); + ClientState retval = null; + if (null != remoteSocket) + { + retval = SocketSessionManagementService.instance.get(remoteSocket); + if (null == retval) + { + retval = new ClientState(); + SocketSessionManagementService.instance.put(remoteSocket, retval); + } + } + else + { + retval = clientState.get(); + } + return retval; } protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level) Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java?rev=1152238&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java Fri Jul 29 14:29:49 2011 @@ -0,0 +1,305 @@ +package org.apache.cassandra.thrift; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.cassandra.service.SocketSessionManagementService; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a interim solution till THRIFT-1167 gets committed... + * + * The idea here is to avoid sticking to one CPU for IO's. For better throughput + * it is spread across multiple threads. Number of selector thread can be the + * number of CPU available. + */ +public class CustomTHsHaServer extends TNonblockingServer +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CustomTHsHaServer.class.getName()); + private Set<SelectorThread> ioThreads = new HashSet<SelectorThread>(); + private volatile boolean stopped_ = true; + private ExecutorService invoker; + + /** + * All the arguments to Non Blocking Server will apply here. In addition, + * executor pool will be responsible for creating the internal threads which + * will process the data. threads for selection usually are equal to the + * number of cpu's + */ + public CustomTHsHaServer(Args args, ExecutorService invoker, int threadCount) + { + super(args); + this.invoker = invoker; + // Create all the Network IO Threads. + for (int i = 0; i < threadCount; ++i) + ioThreads.add(new SelectorThread("Selector-Thread-" + i)); + } + + /** @inheritDoc */ + @Override + public void serve() + { + if (!startListening()) + return; + if (!startThreads()) + return; + setServing(true); + joinSelector(); + invoker.shutdown(); + setServing(false); + stopListening(); + } + + /** + * Save the remote socket as a thead local for future use of client state. + */ + protected class Invocation implements Runnable + { + private final FrameBuffer frameBuffer; + private SelectorThread thread; + + public Invocation(final FrameBuffer frameBuffer, SelectorThread thread) + { + this.frameBuffer = frameBuffer; + this.thread = thread; + } + + public void run() + { + TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_; + SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress()); + frameBuffer.invoke(); + // this is how we let the same selector thread change the selection type. + thread.requestSelectInterestChange(frameBuffer); + } + } + + protected boolean startThreads() + { + stopped_ = false; + // start all the threads. + for (SelectorThread thread : ioThreads) + thread.start(); + return true; + } + + @Override + protected void joinSelector() + { + try + { + // wait till all done with stuff's + for (SelectorThread thread : ioThreads) + thread.join(); + } + catch (InterruptedException e) + { + LOGGER.error("Interrupted while joining threads!", e); + } + } + + /** + * Stop serving and shut everything down. + */ + @Override + public void stop() + { + stopListening(); + stopped_ = true; + for (SelectorThread thread : ioThreads) + thread.wakeupSelector(); + joinSelector(); + } + + /** + * IO Threads will perform expensive IO operations... + */ + protected class SelectorThread extends Thread + { + private final Selector selector; + private TNonblockingServerTransport serverTransport; + private Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>(); + + public SelectorThread(String name) + { + super(name); + try + { + this.selector = SelectorProvider.provider().openSelector(); + this.serverTransport = (TNonblockingServerTransport) serverTransport_; + this.serverTransport.registerSelector(selector); + } + catch (IOException ex) + { + throw new RuntimeException("Couldnt open the NIO selector", ex); + } + } + + public void run() + { + try + { + while (!stopped_) + { + select(); + } + } + catch (Throwable t) + { + LOGGER.error("Uncaught Exception: ", t); + } + } + + private void select() throws InterruptedException, IOException + { + // wait for new keys + selector.select(); + Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); + while (keyIterator.hasNext()) + { + SelectionKey key = keyIterator.next(); + keyIterator.remove(); + if (!key.isValid()) + { + // if invalid cleanup. + cleanupSelectionkey(key); + continue; + } + + if (key.isAcceptable()) + handleAccept(); + if (key.isReadable()) + handleRead(key); + else if (key.isWritable()) + handleWrite(key); + else + LOGGER.debug("Unexpected state " + key.interestOps()); + } + // process the changes which are inserted after completion. + processInterestChanges(); + } + + private void handleAccept() + { + SelectionKey clientKey = null; + TNonblockingTransport client = null; + try + { + // accept the connection + client = (TNonblockingTransport) serverTransport.accept(); + clientKey = client.registerSelector(selector, SelectionKey.OP_READ); + // add this key to the map + FrameBuffer frameBuffer = new FrameBuffer(client, clientKey); + clientKey.attach(frameBuffer); + } catch (TTransportException ex) + { + // ignore this might have been handled by the other threads. + // serverTransport.accept() as it returns null as nothing to accept. + return; + } + catch (IOException tte) + { + // something went wrong accepting. + LOGGER.warn("Exception trying to accept!", tte); + tte.printStackTrace(); + if (clientKey != null) + cleanupSelectionkey(clientKey); + if (client != null) + client.close(); + } + } + + private void handleRead(SelectionKey key) + { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.read()) + { + cleanupSelectionkey(key); + return; + } + + if (buffer.isFrameFullyRead()) + { + if (!requestInvoke(buffer, this)) + cleanupSelectionkey(key); + } + } + + private void handleWrite(SelectionKey key) + { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.write()) + cleanupSelectionkey(key); + } + + public void requestSelectInterestChange(FrameBuffer frameBuffer) + { + synchronized (selectInterestChanges) + { + selectInterestChanges.add(frameBuffer); + } + // Wake-up the selector, if it's currently blocked. + selector.wakeup(); + } + + private void processInterestChanges() + { + synchronized (selectInterestChanges) + { + for (FrameBuffer fb : selectInterestChanges) + fb.changeSelectInterests(); + selectInterestChanges.clear(); + } + } + + private void cleanupSelectionkey(SelectionKey key) + { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (buffer != null) + buffer.close(); + // cancel the selection key + key.cancel(); + } + + public void wakeupSelector() + { + selector.wakeup(); + } + } + + protected boolean requestInvoke(FrameBuffer frameBuffer, SelectorThread thread) + { + try + { + Runnable invocation = new Invocation(frameBuffer, thread); + invoker.execute(invocation); + return true; + } + catch (RejectedExecutionException rx) + { + LOGGER.warn("ExecutorService rejected execution!", rx); + return false; + } + } + + @Override + protected void requestSelectInterestChange(FrameBuffer fb) + { + // Dont change the interest here, this has to be done by the selector + // thread because the method is not synchronized with the rest of the + // selectors threads. + } +} Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java?rev=1152238&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java Fri Jul 29 14:29:49 2011 @@ -0,0 +1,22 @@ +package org.apache.cassandra.thrift; + +import org.apache.cassandra.service.SocketSessionManagementService; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.transport.TNonblockingSocket; + +public class CustomTNonBlockingServer extends TNonblockingServer +{ + public CustomTNonBlockingServer(Args args) + { + super(args); + } + + @Override + protected boolean requestInvoke(FrameBuffer frameBuffer) + { + TNonblockingSocket socket = (TNonblockingSocket) frameBuffer.trans_; + SocketSessionManagementService.remoteSocket.set(socket.getSocketChannel().socket().getRemoteSocketAddress()); + frameBuffer.invoke(); + return true; + } +} Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java ------------------------------------------------------------------------------ svn:eol-style = native Added: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java?rev=1152238&view=auto ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java (added) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java Fri Jul 29 14:29:49 2011 @@ -0,0 +1,71 @@ +package org.apache.cassandra.thrift; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +import org.apache.cassandra.service.SocketSessionManagementService; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TCustomNonblockingServerSocket extends TNonblockingServerSocket +{ + private static final Logger logger = LoggerFactory.getLogger(TCustomNonblockingServerSocket.class); + private final boolean keepAlive; + private final Integer sendBufferSize; + private final Integer recvBufferSize; + + public TCustomNonblockingServerSocket(InetSocketAddress bindAddr, boolean keepAlive, Integer sendBufferSize, Integer recvBufferSize) throws TTransportException + { + super(bindAddr); + this.keepAlive = keepAlive; + this.sendBufferSize = sendBufferSize; + this.recvBufferSize = recvBufferSize; + } + + @Override + protected TNonblockingSocket acceptImpl() throws TTransportException + { + TNonblockingSocket tsocket = super.acceptImpl(); + if (tsocket == null || tsocket.getSocketChannel() == null) + return tsocket; + Socket socket = tsocket.getSocketChannel().socket(); + // clean up the old information. + SocketSessionManagementService.instance.remove(socket.getRemoteSocketAddress()); + try + { + socket.setKeepAlive(this.keepAlive); + } catch (SocketException se) + { + logger.warn("Failed to set keep-alive on Thrift socket.", se); + } + + if (this.sendBufferSize != null) + { + try + { + socket.setSendBufferSize(this.sendBufferSize.intValue()); + } + catch (SocketException se) + { + logger.warn("Failed to set send buffer size on Thrift socket.", se); + } + } + + if (this.recvBufferSize != null) + { + try + { + socket.setReceiveBufferSize(this.recvBufferSize.intValue()); + } + catch (SocketException se) + { + logger.warn("Failed to set receive buffer size on Thrift socket.", se); + } + } + return tsocket; + } +} \ No newline at end of file Propchange: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/thrift/TCustomNonblockingServerSocket.java ------------------------------------------------------------------------------ svn:eol-style = native