Author: jbellis
Date: Sun Jun 6 05:06:11 2010
New Revision: 951820
URL: http://svn.apache.org/viewvc?rev=951820&view=rev
Log:
merge from 0.6
Added:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-951249
+/cassandra/branches/cassandra-0.6:922689-951819
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5:888872-915439
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-951249
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-951819
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-951249
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-951819
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-951249
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-951819
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-951249
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-951819
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jun 6 05:06:11 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-951249
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-951819
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=951820&r1=951819&r2=951820&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Sun Jun 6 05:06:11 2010
@@ -23,19 +23,20 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.InetAddress;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.config.ConfigurationException;
+import org.apache.thrift.server.TServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.log4j.PropertyConfigurator;
-
import org.apache.cassandra.utils.Mx4jTool;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.migration.Migration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@@ -60,7 +61,7 @@ import org.apache.cassandra.db.Compactio
public class CassandraDaemon
{
private static Logger logger =
LoggerFactory.getLogger(CassandraDaemon.class);
- private TThreadPoolServer serverEngine;
+ private TServer serverEngine;
private void setup() throws IOException, TTransportException
{
@@ -122,7 +123,7 @@ public class CassandraDaemon
StorageService.instance.initServer();
// now we start listening for clients
- CassandraServer cassandraServer = new CassandraServer();
+ final CassandraServer cassandraServer = new CassandraServer();
Cassandra.Processor processor = new
Cassandra.Processor(cassandraServer);
// Transport
@@ -147,16 +148,34 @@ public class CassandraDaemon
outTransportFactory = new TTransportFactory();
}
+
// ThreadPool Server
- TThreadPoolServer.Options options = new TThreadPoolServer.Options();
+ CustomTThreadPoolServer.Options options = new
CustomTThreadPoolServer.Options();
options.minWorkerThreads = 64;
- serverEngine = new TThreadPoolServer(new TProcessorFactory(processor),
+
+ SynchronousQueue<Runnable> executorQueue = new
SynchronousQueue<Runnable>();
+
+ ExecutorService executorService = new
ThreadPoolExecutor(options.minWorkerThreads,
+
options.maxWorkerThreads,
+ 60,
+
TimeUnit.SECONDS,
+ executorQueue)
+ {
+ @Override
+ protected void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r, t);
+ cassandraServer.logout();
+ }
+ };
+ serverEngine = new CustomTThreadPoolServer(new
TProcessorFactory(processor),
tServerSocket,
inTransportFactory,
outTransportFactory,
tProtocolFactory,
tProtocolFactory,
- options);
+ options,
+ executorService);
}
/** hook for JSVC */
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=951820&r1=951819&r2=951820&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Sun Jun 6 05:06:11 2010
@@ -604,6 +604,15 @@ public class CassandraServer implements
return level;
}
+ public void logout()
+ {
+ keySpace.remove();
+ loginDone.remove();
+
+ if (logger.isDebugEnabled())
+ logger.debug("logout complete");
+ }
+
protected void checkKeyspaceAndLoginAuthorized(AccessLevel level) throws
InvalidRequestException
{
if (keySpace.get() == null)
@@ -851,7 +860,6 @@ public class CassandraServer implements
cf_def.key_cache_size);
}
- @Override
public void truncate(String keyspace, String cfname) throws
InvalidRequestException, UnavailableException, TException
{
logger.debug("truncating {} in {}", cfname, keyspace);
@@ -883,7 +891,6 @@ public class CassandraServer implements
keySpace.set(keyspace);
}
- @Override
public Map<String, List<String>> check_schema_agreement() throws
TException, InvalidRequestException
{
logger.debug("checking schema agreement");
Added:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=951820&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Sun Jun 6 05:06:11 2010
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.thrift;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Slightly modified version of the Apache Thrift TThreadPoolServer.
+ *
+ * This allows passing an executor so you have more control over the actual
+ * behaviour of the tasks being run.
+ *
+ * Newer version of Thrift should make this obsolete.
+ */
+public class CustomTThreadPoolServer extends TServer {
+
+private static final Logger LOGGER =
LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());
+
+// Executor service for handling client connections
+private ExecutorService executorService_;
+
+// Flag for stopping the server
+private volatile boolean stopped_;
+
+// Server options
+private Options options_;
+
+// Customizable server options
+public static class Options {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+}
+
+
+public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
+ TServerSocket tServerSocket,
+ TTransportFactory inTransportFactory,
+ TTransportFactory outTransportFactory,
+ TProtocolFactory tProtocolFactory,
+ TProtocolFactory tProtocolFactory2,
+ Options options,
+ ExecutorService executorService) {
+
+ super(tProcessorFactory, tServerSocket, inTransportFactory,
outTransportFactory,
+ tProtocolFactory, tProtocolFactory2);
+ options_ = options;
+ executorService_ = executorService;
+}
+
+
+public void serve() {
+ try {
+ serverTransport_.listen();
+ } catch (TTransportException ttx) {
+ LOGGER.error("Error occurred during listening.", ttx);
+ return;
+ }
+
+ stopped_ = false;
+ while (!stopped_) {
+ int failureCount = 0;
+ try {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ } catch (TTransportException ttx) {
+ if (!stopped_) {
+ ++failureCount;
+ LOGGER.warn("Transport error occurred during acceptance of
message.", ttx);
+ }
+ }
+ }
+
+ executorService_.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We
want
+ // to let the executorService clear it's task queue, closing client
sockets
+ // appropriately.
+ long timeoutMS =
options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0) {
+ try {
+ executorService_.awaitTermination(timeoutMS,
TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException ix) {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+}
+
+public void stop() {
+ stopped_ = true;
+ serverTransport_.interrupt();
+}
+
+private class WorkerProcess implements Runnable {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client) {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run() {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ try {
+ processor = processorFactory_.getProcessor(client_);
+ inputTransport = inputTransportFactory_.getTransport(client_);
+ outputTransport = outputTransportFactory_.getTransport(client_);
+ inputProtocol =
inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol =
outputProtocolFactory_.getProtocol(outputTransport);
+ // we check stopped_ first to make sure we're not supposed to
be shutting
+ // down. this is necessary for graceful shutdown.
+ while (!stopped_ && processor.process(inputProtocol,
outputProtocol)) {}
+ } catch (TTransportException ttx) {
+ // Assume the client died and continue silently
+ } catch (TException tx) {
+ LOGGER.error("Thrift error occurred during processing of
message.", tx);
+ } catch (Exception x) {
+ LOGGER.error("Error occurred during processing of message.", x);
+ }
+
+ if (inputTransport != null) {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null) {
+ outputTransport.close();
+ }
+ }
+}
+}