Author: gdusbabek
Date: Fri Apr 8 00:47:28 2011
New Revision: 1090075
URL: http://svn.apache.org/viewvc?rev=1090075&view=rev
Log:
thrift 0.6 tweaks. patch by gdusbabek, reviewed by tjake. CASSANDRA-2412
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1090075&r1=1090074&r2=1090075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Fri
Apr 8 00:47:28 2011
@@ -814,7 +814,7 @@ public final class CFMetaData
for (org.apache.cassandra.thrift.ColumnDef cdef :
def.getColumn_metadata())
{
org.apache.cassandra.db.migration.avro.ColumnDef tdef = new
org.apache.cassandra.db.migration.avro.ColumnDef();
- tdef.name = ByteBufferUtil.clone(cdef.BufferForName());
+ tdef.name = ByteBufferUtil.clone(cdef.bufferForName());
tdef.validation_class = cdef.getValidation_class();
tdef.index_name = cdef.getIndex_name();
tdef.index_type = cdef.getIndex_type() == null ? null :
org.apache.cassandra.db.migration.avro.IndexType.valueOf(cdef.getIndex_type().name());
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=1090075&r1=1090074&r2=1090075&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Fri Apr 8 00:47:28 2011
@@ -22,6 +22,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
+import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,21 +133,19 @@ public class CassandraDaemon extends org
}
// ThreadPool Server
- CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
- options.minWorkerThreads = DatabaseDescriptor.getRpcMinThreads();
- options.maxWorkerThreads = DatabaseDescriptor.getRpcMaxThreads();
+ TThreadPoolServer.Args args = new
TThreadPoolServer.Args(tServerSocket)
+
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+
.inputTransportFactory(inTransportFactory)
+
.outputTransportFactory(outTransportFactory)
+
.inputProtocolFactory(tProtocolFactory)
+
.outputProtocolFactory(tProtocolFactory)
+ .processor(processor);
ExecutorService executorService = new
CleaningThreadPool(cassandraServer.clientState,
- options.minWorkerThreads,
- options.maxWorkerThreads);
- serverEngine = new CustomTThreadPoolServer(new
TProcessorFactory(processor),
- tServerSocket,
- inTransportFactory,
- outTransportFactory,
- tProtocolFactory,
- tProtocolFactory,
- options,
- executorService);
+ args.minWorkerThreads,
+ args.maxWorkerThreads);
+ serverEngine = new CustomTThreadPoolServer(args, executorService);
}
public void run()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1090075&r1=1090074&r2=1090075&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Fri Apr 8 00:47:28 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,38 +56,18 @@ public class CustomTThreadPoolServer ext
private volatile boolean stopped_;
// Server options
- private Options options_;
+ private TThreadPoolServer.Args args;
//Track and Limit the number of connected clients
private final AtomicInteger activeClients = new AtomicInteger(0);
- // Customizable server options
- public static class Options
- {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
- }
-
-
- public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
- TServerSocket tServerSocket,
- TTransportFactory inTransportFactory,
- TTransportFactory outTransportFactory,
- TProtocolFactory tProtocolFactory,
- TProtocolFactory tProtocolFactory2,
- Options options,
- ExecutorService executorService)
- {
-
- super(tProcessorFactory, tServerSocket, inTransportFactory,
outTransportFactory,
- tProtocolFactory, tProtocolFactory2);
- options_ = options;
+
+ public CustomTThreadPoolServer(TThreadPoolServer.Args args,
ExecutorService executorService) {
+ super(args);
executorService_ = executorService;
+ this.args = args;
}
-
-
+
public void serve()
{
try
@@ -103,7 +84,7 @@ public class CustomTThreadPoolServer ext
while (!stopped_)
{
// block until we are under max clients
- while (activeClients.get() >= options_.maxWorkerThreads)
+ while (activeClients.get() >= args.maxWorkerThreads)
{
try
{
@@ -132,8 +113,8 @@ public class CustomTThreadPoolServer ext
}
}
- if (activeClients.get() >= options_.maxWorkerThreads)
- LOGGER.warn("Maximum number of clients " +
options_.maxWorkerThreads + " reached");
+ if (activeClients.get() >= args.maxWorkerThreads)
+ LOGGER.warn("Maximum number of clients " +
args.maxWorkerThreads + " reached");
}
executorService_.shutdown();
@@ -142,7 +123,7 @@ public class CustomTThreadPoolServer ext
// exception. If we don't do this, then we'll shut down prematurely.
We want
// to let the executorService clear it's task queue, closing client
sockets
// appropriately.
- long timeoutMS =
options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0)
{