Updated Branches: refs/heads/trunk 074f4befa -> 6b83663ca
merge from 1.1 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b83663c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b83663c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b83663c Branch: refs/heads/trunk Commit: 6b83663ca494123d87477f69205633497e4bef1d Parents: 074f4be 8264eb2 Author: Pavel Yaskevich <[email protected]> Authored: Fri Oct 5 14:59:13 2012 -0700 Committer: Pavel Yaskevich <[email protected]> Committed: Fri Oct 5 14:59:13 2012 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 5 +- .../cassandra/config/DatabaseDescriptor.java | 2 - .../apache/cassandra/thrift/CustomTHsHaServer.java | 39 ++++ .../cassandra/thrift/CustomTNonBlockingServer.java | 31 +++ .../cassandra/thrift/CustomTThreadPoolServer.java | 63 ++++++ .../cassandra/thrift/TServerCustomFactory.java | 75 +++++++ .../apache/cassandra/thrift/TServerFactory.java | 43 ++++ .../org/apache/cassandra/thrift/ThriftServer.java | 150 ++------------- 9 files changed, 274 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index cab1425,4d2fd27..4fffbf7 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -113,7 -19,7 +113,8 @@@ * Pluggable Thrift transport factories for CLI (CASSANDRA-4609) * Backport adding AlterKeyspace statement (CASSANDRA-4611) * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770) + * Add binary protocol events for schema changes (CASSANDRA-4684) + * Add ability to use custom TServerFactory implementations (CASSANDRA-4608) Merged from 1.0: * Switch from NBHM to CHM in MessagingService's callback map, which prevents OOM in long-running instances (CASSANDRA-4708) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/conf/cassandra.yaml ---------------------------------------------------------------------- diff --cc conf/cassandra.yaml index f98f9f0,5e0be98..84c0ae5 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@@ -333,31 -287,39 +333,34 @@@ rpc_port: 916 # enable or disable keepalive on rpc connections rpc_keepalive: true - # Cassandra provides three options for the RPC Server: + # Cassandra provides three out-of-the-box options for the RPC Server: # -# sync -> One connection per thread in the rpc pool (see below). -# For a very large number of clients, memory will be your limiting -# factor; on a 64 bit JVM, 128KB is the minimum stack size per thread. -# Connection pooling is very, very strongly recommended. -# -# async -> Nonblocking server implementation with one thread to serve -# rpc connections. This is not recommended for high throughput use -# cases. Async has been tested to be about 50% slower than sync -# or hsha and is deprecated: it will be removed in the next major release. +# sync -> One thread per thrift connection. For a very large number of clients, memory +# will be your limiting factor. On a 64 bit JVM, 128KB is the minimum stack size +# per thread, and that will correspond to your use of virtual memory (but physical memory +# may be limited depending on use of stack space). # -# hsha -> Stands for "half synchronous, half asynchronous." The rpc thread pool -# (see below) is used to manage requests, but the threads are multiplexed -# across the different clients. +# hsha -> Stands for "half synchronous, half asynchronous." All thrift clients are handled +# asynchronously using a small number of threads that does not vary with the amount +# of thrift clients (and thus scales well to many clients). The rpc requests are still +# synchronous (one thread per active request). # # The default is sync because on Windows hsha is about 30% slower. On Linux, # sync/hsha performance is about the same, with hsha of course using less memory. + # + # Alternatively, can provide your own RPC server by providing the fully-qualified class name + # of an o.a.c.t.TServerFactory that can create an instance of it. rpc_server_type: sync -# Uncomment rpc_min|max|thread to set request pool size. -# You would primarily set max for the sync server to safeguard against -# misbehaved clients; if you do hit the max, Cassandra will block until one -# disconnects before accepting more. The defaults for sync are min of 16 and max -# unlimited. -# -# For the Hsha server, the min and max both default to quadruple the number of -# CPU cores. +# Uncomment rpc_min|max_thread to set request pool size limits. +# +# Regardless of your choice of RPC server (see above), the number of maximum requests in the +# RPC thread pool dictates how many concurrent requests are possible (but if you are using the sync +# RPC server, it also dictates the number of clients that can be connected at all). # -# This configuration is ignored by the async server. +# The default is unlimited and thus provide no protection against clients overwhelming the server. You are +# encouraged to set a maximum that makes sense for you in production, but do keep in mind that +# rpc_max_threads represents the maximum number of client requests this server may execute concurrently. # # rpc_min_threads: 16 # rpc_max_threads: 2048 http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index cc8f07f,7ed6170..02a91e8 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -390,13 -381,14 +390,11 @@@ public class DatabaseDescripto if (conf.stream_throughput_outbound_megabits_per_sec == null) conf.stream_throughput_outbound_megabits_per_sec = 400; - if (!ThriftServer.rpc_server_types.contains(conf.rpc_server_type.toLowerCase())) - throw new ConfigurationException("Unknown rpc_server_type: " + conf.rpc_server_type); if (conf.rpc_min_threads == null) - conf.rpc_min_threads = conf.rpc_server_type.toLowerCase().equals("hsha") - ? Runtime.getRuntime().availableProcessors() * 4 - : 16; + conf.rpc_min_threads = 16; + if (conf.rpc_max_threads == null) - conf.rpc_max_threads = conf.rpc_server_type.toLowerCase().equals("hsha") - ? Runtime.getRuntime().availableProcessors() * 4 - : Integer.MAX_VALUE; + conf.rpc_max_threads = Integer.MAX_VALUE; /* data file and commit log directories. they get created later, when they're needed. */ if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java index c6517a2,6ade5ca..436dbb3 --- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java @@@ -7,17 -9,20 +7,18 @@@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - +package org.apache.cassandra.thrift; import java.io.IOException; + import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java index 52e9f66,479fba8..b6f76a7 --- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java @@@ -7,19 -9,26 +7,24 @@@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - +package org.apache.cassandra.thrift; + import java.net.InetSocketAddress; + import org.apache.cassandra.service.SocketSessionManagementService; import org.apache.thrift.server.TNonblockingServer; + import org.apache.thrift.server.TServer; + import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingSocket; + import org.apache.thrift.transport.TTransportException; public class CustomTNonBlockingServer extends TNonblockingServer { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java index ebffa6d,fc07c60..00f0444 --- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java +++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java @@@ -1,29 -1,34 +1,37 @@@ /* * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - package org.apache.cassandra.thrift; + import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; ++import java.util.concurrent.SynchronousQueue; ++import java.util.concurrent.ThreadPoolExecutor; ++import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; ++import org.apache.cassandra.concurrent.NamedThreadFactory; + import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.service.AbstractCassandraDaemon; ++import org.apache.cassandra.service.ClientState; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TProtocol; @@@ -217,4 -223,32 +226,58 @@@ public class CustomTThreadPoolServer ex } } } + + public static class Factory implements TServerFactory + { + public TServer buildTServer(Args args) + { + final InetSocketAddress addr = args.addr; + TServerTransport serverTransport; + try + { + serverTransport = new TCustomServerSocket(addr, args.keepAlive, args.sendBufferSize, args.recvBufferSize); + } + catch (TTransportException e) + { + throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e); + } + // ThreadPool Server and will be invocation per connection basis... + TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport) + .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads()) + .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads()) + .inputTransportFactory(args.inTransportFactory) + .outputTransportFactory(args.outTransportFactory) + .inputProtocolFactory(args.tProtocolFactory) + .outputProtocolFactory(args.tProtocolFactory) + .processor(args.processor); - ExecutorService executorService = new AbstractCassandraDaemon.CleaningThreadPool(args.cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads); ++ ExecutorService executorService = new CleaningThreadPool(args.cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads); + return new CustomTThreadPoolServer(serverArgs, executorService); + } + } ++ ++ /** ++ * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool ++ * interface (for integration with Avro), and performs ClientState cleanup. ++ * ++ * (Note that the tasks being executed perform their own while-command-process ++ * loop until the client disconnects.) ++ */ ++ private static class CleaningThreadPool extends ThreadPoolExecutor ++ { ++ private final ThreadLocal<ClientState> state; ++ ++ public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads) ++ { ++ super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift")); ++ this.state = state; ++ } ++ ++ @Override ++ protected void afterExecute(Runnable r, Throwable t) ++ { ++ super.afterExecute(r, t); ++ DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); ++ state.get().logout(); ++ } ++ } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/TServerCustomFactory.java index 0000000,50e4fac..208f664 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java +++ b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java @@@ -1,0 -1,75 +1,75 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.cassandra.thrift; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.thrift.server.TServer; + + /** + * Helper implementation to create a thrift TServer based on one of the common types we support (sync, async, hsha), + * or a custom type by setting the fully qualified java class name in the rpc_server_type setting. + */ + public class TServerCustomFactory implements TServerFactory + { + private static Logger logger = LoggerFactory.getLogger(TServerCustomFactory.class); + private final String serverType; + + public TServerCustomFactory(String serverType) + { + assert serverType != null; + this.serverType = serverType; + } + + public TServer buildTServer(TServerFactory.Args args) + { + TServer server; - if (CassandraDaemon.SYNC.equalsIgnoreCase(serverType)) ++ if (ThriftServer.SYNC.equalsIgnoreCase(serverType)) + { + server = new CustomTThreadPoolServer.Factory().buildTServer(args); + logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort())); + } - else if(CassandraDaemon.ASYNC.equalsIgnoreCase(serverType)) ++ else if(ThriftServer.ASYNC.equalsIgnoreCase(serverType)) + { + server = new CustomTNonBlockingServer.Factory().buildTServer(args); + logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort())); + } - else if(CassandraDaemon.HSHA.equalsIgnoreCase(serverType)) ++ else if(ThriftServer.HSHA.equalsIgnoreCase(serverType)) + { + server = new CustomTHsHaServer.Factory().buildTServer(args); + logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort())); + } + else + { + TServerFactory serverFactory; + try + { + serverFactory = (TServerFactory) Class.forName(serverType).newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Failed to instantiate server factory:" + serverType, e); + } + server = serverFactory.buildTServer(args); + logger.info(String.format("Using custom thrift server %s on %s : %s", server.getClass().getName(), args.addr.getHostName(), args.addr.getPort())); + } + return server; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b83663c/src/java/org/apache/cassandra/thrift/ThriftServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/ThriftServer.java index b104cfd,0000000..3ff28e5 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/thrift/ThriftServer.java +++ b/src/java/org/apache/cassandra/thrift/ThriftServer.java @@@ -1,235 -1,0 +1,121 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.thrift; + +import java.net.InetAddress; +import java.net.InetSocketAddress; - import java.util.Arrays; - import java.util.List; - import java.util.concurrent.ExecutorService; - import java.util.concurrent.SynchronousQueue; - import java.util.concurrent.ThreadPoolExecutor; - import java.util.concurrent.TimeUnit; + - import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; - import org.apache.cassandra.concurrent.NamedThreadFactory; - import org.apache.cassandra.service.CassandraDaemon; - import org.apache.thrift.server.TNonblockingServer; - import org.apache.thrift.server.TThreadPoolServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; ++import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.service.ClientState; - import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TFramedTransport; - import org.apache.thrift.transport.TNonblockingServerTransport; - import org.apache.thrift.transport.TServerTransport; - import org.apache.thrift.transport.TTransportException; - import org.apache.thrift.transport.TTransportFactory; + +public class ThriftServer implements CassandraDaemon.Server +{ - private static final Logger logger = LoggerFactory.getLogger(ThriftServer.class); - private final static String SYNC = "sync"; - private final static String ASYNC = "async"; - private final static String HSHA = "hsha"; - public final static List<String> rpc_server_types = Arrays.asList(SYNC, ASYNC, HSHA); ++ protected static CassandraDaemon instance; ++ private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class); ++ final static String SYNC = "sync"; ++ final static String ASYNC = "async"; ++ final static String HSHA = "hsha"; + + private final InetAddress address; + private final int port; + private volatile ThriftServerThread server; + + public ThriftServer(InetAddress address, int port) + { + this.address = address; + this.port = port; + } + + public void start() + { + if (server == null) + { + server = new ThriftServerThread(address, port); + server.start(); + } + } + + public void stop() + { + if (server != null) + { + server.stopServer(); + try + { + server.join(); + } + catch (InterruptedException e) + { + logger.error("Interrupted while waiting thrift server to stop", e); + } + server = null; + } + } + + public boolean isRunning() + { + return server != null; + } + + /** + * Simple class to run the thrift connection accepting code in separate + * thread of control. + */ + private static class ThriftServerThread extends Thread + { + private TServer serverEngine; + + public ThriftServerThread(InetAddress listenAddr, int listenPort) + { + // now we start listening for clients - final CassandraServer cassandraServer = new CassandraServer(); - Cassandra.Processor processor = new Cassandra.Processor(cassandraServer); - - // Transport + logger.info(String.format("Binding thrift service to %s:%s", listenAddr, listenPort)); + - // Protocol factory - TProtocolFactory tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength()); - - // Transport factory ++ TServerFactory.Args args = new TServerFactory.Args(); ++ args.tProtocolFactory = new TBinaryProtocol.Factory(true, true, DatabaseDescriptor.getThriftMaxMessageLength()); ++ args.addr = new InetSocketAddress(listenAddr, listenPort); ++ args.cassandraServer = new CassandraServer(); ++ args.processor = new Cassandra.Processor(args.cassandraServer); ++ args.keepAlive = DatabaseDescriptor.getRpcKeepAlive(); ++ args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize(); ++ args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize(); + int tFramedTransportSize = DatabaseDescriptor.getThriftFramedTransportSize(); - TTransportFactory inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); - TTransportFactory outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); - logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize); + - if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC)) - { - TServerTransport serverTransport; - try - { - serverTransport = new TCustomServerSocket(new InetSocketAddress(listenAddr, listenPort), - DatabaseDescriptor.getRpcKeepAlive(), - DatabaseDescriptor.getRpcSendBufferSize(), - DatabaseDescriptor.getRpcRecvBufferSize()); - } - catch (TTransportException e) - { - throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); - } - // ThreadPool Server and will be invocation per connection basis... - TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport) - .minWorkerThreads(DatabaseDescriptor.getRpcMinThreads()) - .maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads()) - .inputTransportFactory(inTransportFactory) - .outputTransportFactory(outTransportFactory) - .inputProtocolFactory(tProtocolFactory) - .outputProtocolFactory(tProtocolFactory) - .processor(processor); - ExecutorService executorService = new CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads); - serverEngine = new CustomTThreadPoolServer(serverArgs, executorService); - logger.info(String.format("Using synchronous/threadpool thrift server on %s : %s", listenAddr, listenPort)); - } - else - { - TNonblockingServerTransport serverTransport; - try - { - serverTransport = new TCustomNonblockingServerSocket(new InetSocketAddress(listenAddr, listenPort), - DatabaseDescriptor.getRpcKeepAlive(), - DatabaseDescriptor.getRpcSendBufferSize(), - DatabaseDescriptor.getRpcRecvBufferSize()); - } - catch (TTransportException e) - { - throw new RuntimeException(String.format("Unable to create thrift socket to %s:%s", listenAddr, listenPort), e); - } - - if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC)) - { - // This is single threaded hence the invocation will be all - // in one thread. - TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) - .outputTransportFactory(outTransportFactory) - .inputProtocolFactory(tProtocolFactory) - .outputProtocolFactory(tProtocolFactory) - .processor(processor); - logger.info(String.format("Using non-blocking/asynchronous thrift server on %s : %s", listenAddr, listenPort)); - serverEngine = new CustomTNonBlockingServer(serverArgs); - } - else if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA)) - { - // This is NIO selector service but the invocation will be Multi-Threaded with the Executor service. - ExecutorService executorService = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(), - DatabaseDescriptor.getRpcMaxThreads(), - 60L, - TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), - new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL"); - TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory) - .outputTransportFactory(outTransportFactory) - .inputProtocolFactory(tProtocolFactory) - .outputProtocolFactory(tProtocolFactory) - .processor(processor); - logger.info(String.format("Using custom half-sync/half-async thrift server on %s : %s", listenAddr, listenPort)); - // Check for available processors in the system which will be equal to the IO Threads. - serverEngine = new CustomTHsHaServer(serverArgs, executorService, Runtime.getRuntime().availableProcessors()); - } - } ++ logger.info("Using TFramedTransport with a max frame size of {} bytes.", tFramedTransportSize); ++ args.inTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); ++ args.outTransportFactory = new TFramedTransport.Factory(tFramedTransportSize); ++ serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args); + } + + public void run() + { + logger.info("Listening for thrift clients..."); + serverEngine.serve(); + } + + public void stopServer() + { + logger.info("Stop listening to thrift clients"); + serverEngine.stop(); + } + } - - /** - * A subclass of Java's ThreadPoolExecutor which implements Jetty's ThreadPool - * interface (for integration with Avro), and performs ClientState cleanup. - * - * (Note that the tasks being executed perform their own while-command-process - * loop until the client disconnects.) - */ - private static class CleaningThreadPool extends ThreadPoolExecutor - { - private final ThreadLocal<ClientState> state; - public CleaningThreadPool(ThreadLocal<ClientState> state, int minWorkerThread, int maxWorkerThreads) - { - super(minWorkerThread, maxWorkerThreads, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new NamedThreadFactory("Thrift")); - this.state = state; - } - - @Override - protected void afterExecute(Runnable r, Throwable t) - { - super.afterExecute(r, t); - DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); - state.get().logout(); - } - } +}
