Author: jbellis
Date: Fri Aug 26 03:54:32 2011
New Revision: 1161983
URL: http://svn.apache.org/viewvc?rev=1161983&view=rev
Log:
Add timeouts to client request schedulers
patch by Stu Hood; reviewed by Melvin Wang for CASSANDRA-3079
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Aug 26 03:54:32 2011
@@ -42,6 +42,7 @@
* Add "install" command to cassandra.bat (CASSANDRA-292)
* clean up KSMetadata, CFMetadata from unnecessary
Thrift<->Avro conversion methods (CASSANDRA-3032)
+ * Add timeouts to client request schedulers (CASSANDRA-3079)
0.8.5
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/scheduler/IRequestScheduler.java
Fri Aug 26 03:54:32 2011
@@ -20,6 +20,8 @@ package org.apache.cassandra.scheduler;
*
*/
+import java.util.concurrent.TimeoutException;
+
/**
* Implementors of IRequestScheduler must provide a constructor taking a
RequestSchedulerOptions object.
*/
@@ -30,8 +32,9 @@ public interface IRequestScheduler
*
* @param t Thread handing the request
* @param id Scheduling parameter, an id to distinguish profiles
(users/keyspace)
+ * @param timeout The max time in milliseconds to spend blocking for a
slot
*/
- public void queue(Thread t, String id);
+ public void queue(Thread t, String id, long timeoutMS) throws
TimeoutException;
/**
* A convenience method for indicating when a particular request has
completed
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/NoScheduler.java
Fri Aug 26 03:54:32 2011
@@ -34,7 +34,7 @@ public class NoScheduler implements IReq
public NoScheduler() {}
- public void queue(Thread t, String id) {}
+ public void queue(Thread t, String id, long timeoutMS) {}
public void release() {}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
Fri Aug 26 03:54:32 2011
@@ -23,6 +23,7 @@ package org.apache.cassandra.scheduler;
import java.util.Map;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,14 +79,14 @@ public class RoundRobinScheduler impleme
logger.info("Started the RoundRobin Request Scheduler");
}
- public void queue(Thread t, String id)
+ public void queue(Thread t, String id, long timeoutMS) throws
TimeoutException
{
WeightedQueue weightedQueue = getWeightedQueue(id);
try
{
queueSize.release();
- weightedQueue.put(t);
+ weightedQueue.put(t, timeoutMS);
}
catch (InterruptedException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
Fri Aug 26 03:54:32 2011
@@ -22,6 +22,8 @@ package org.apache.cassandra.scheduler;
*/
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -56,9 +58,9 @@ class WeightedQueue implements WeightedQ
}
}
- public void put(Thread t) throws InterruptedException
+ public void put(Thread t, long timeoutMS) throws InterruptedException,
TimeoutException
{
- queue.put(new WeightedQueue.Entry(t));
+ queue.offer(new WeightedQueue.Entry(t), timeoutMS,
TimeUnit.MILLISECONDS);
}
public Thread poll()
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1161983&r1=1161982&r2=1161983&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Fri Aug 26 03:54:32 2011
@@ -122,9 +122,9 @@ public class CassandraServer implements
List<Row> rows;
try
{
+ schedule(DatabaseDescriptor.getRpcTimeout());
try
{
- schedule();
rows = StorageProxy.read(commands, consistency_level);
}
finally
@@ -625,23 +625,23 @@ public class CassandraServer implements
private void doInsert(ConsistencyLevel consistency_level, List<? extends
IMutation> mutations) throws UnavailableException, TimedOutException,
InvalidRequestException
{
ThriftValidation.validateConsistencyLevel(state().getKeyspace(),
consistency_level);
+ if (mutations.isEmpty())
+ return;
try
{
- schedule();
-
+ schedule(DatabaseDescriptor.getRpcTimeout());
try
{
- if (!mutations.isEmpty())
- StorageProxy.mutate(mutations, consistency_level);
+ StorageProxy.mutate(mutations, consistency_level);
}
- catch (TimeoutException e)
+ finally
{
- throw new TimedOutException();
+ release();
}
}
- finally
+ catch (TimeoutException e)
{
- release();
+ throw new TimedOutException();
}
}
@@ -686,9 +686,9 @@ public class CassandraServer implements
{
bounds = new Bounds(p.getToken(range.start_key),
p.getToken(range.end_key));
}
+ schedule(DatabaseDescriptor.getRpcTimeout());
try
{
- schedule();
rows = StorageProxy.getRangeSlice(new
RangeSliceCommand(keyspace, column_parent, predicate, bounds, range.count),
consistency_level);
}
finally
@@ -829,9 +829,9 @@ public class CassandraServer implements
/**
* Schedule the current thread for access to the required services
*/
- private void schedule()
+ private void schedule(long timeoutMS) throws TimeoutException
{
- requestScheduler.queue(Thread.currentThread(),
state().getSchedulingValue());
+ requestScheduler.queue(Thread.currentThread(),
state().getSchedulingValue(), timeoutMS);
}
/**
@@ -1085,8 +1085,15 @@ public class CassandraServer implements
state().hasColumnFamilyAccess(cfname, Permission.WRITE);
try
{
- schedule();
- StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+ schedule(DatabaseDescriptor.getRpcTimeout());
+ try
+ {
+ StorageProxy.truncateBlocking(state().getKeyspace(), cfname);
+ }
+ finally
+ {
+ release();
+ }
}
catch (TimeoutException e)
{
@@ -1096,10 +1103,6 @@ public class CassandraServer implements
{
throw (UnavailableException) new
UnavailableException().initCause(e);
}
- finally
- {
- release();
- }
}
public void set_keyspace(String keyspace) throws InvalidRequestException,
TException