Author: jbellis
Date: Thu Sep 1 03:14:12 2011
New Revision: 1163898
URL: http://svn.apache.org/viewvc?rev=1163898&view=rev
Log:
Properly throw timeouts, decrement the count of waiters on timeout, fix
off-by-one in taskCount
patch by Stu Hood; reviewed by Ryan King for CASSANDRA-3096
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 1 03:14:12 2011
@@ -42,7 +42,7 @@
* Add "install" command to cassandra.bat (CASSANDRA-292)
* clean up KSMetadata, CFMetadata from unnecessary
Thrift<->Avro conversion methods (CASSANDRA-3032)
- * Add timeouts to client request schedulers (CASSANDRA-3079)
+ * Add timeouts to client request schedulers (CASSANDRA-3079, 3096)
* Cli to use hashes rather than array of hashes for strategy options
(CASSANDRA-3081)
* LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087)
* Improvements of the CLI `describe` command (CASSANDRA-2630)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
Thu Sep 1 03:14:12 2011
@@ -43,7 +43,6 @@ public class RoundRobinScheduler impleme
//Map of queue id to weighted queue
private final NonBlockingHashMap<String, WeightedQueue> queues;
- private static boolean started = false;
private final Semaphore taskCount;
@@ -56,12 +55,12 @@ public class RoundRobinScheduler impleme
public RoundRobinScheduler(RequestSchedulerOptions options)
{
- assert !started;
-
defaultWeight = options.default_weight;
weights = options.weights;
- taskCount = new Semaphore(options.throttle_limit);
+ // the task count is acquired for the first time _after_ releasing a
thread, so we pre-decrement
+ taskCount = new Semaphore(options.throttle_limit - 1);
+
queues = new NonBlockingHashMap<String, WeightedQueue>();
Runnable runnable = new Runnable()
{
@@ -75,7 +74,6 @@ public class RoundRobinScheduler impleme
};
Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
scheduler.start();
- started = true;
logger.info("Started the RoundRobin Request Scheduler");
}
@@ -86,7 +84,21 @@ public class RoundRobinScheduler impleme
try
{
queueSize.release();
- weightedQueue.put(t, timeoutMS);
+ try
+ {
+ weightedQueue.put(t, timeoutMS);
+ // the scheduler will release us when a slot is available
+ }
+ catch (TimeoutException e)
+ {
+ queueSize.acquireUninterruptibly();
+ throw e;
+ }
+ catch (InterruptedException e)
+ {
+ queueSize.acquireUninterruptibly();
+ throw e;
+ }
}
catch (InterruptedException e)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
Thu Sep 1 03:14:12 2011
@@ -60,7 +60,8 @@ class WeightedQueue implements WeightedQ
public void put(Thread t, long timeoutMS) throws InterruptedException,
TimeoutException
{
- queue.offer(new WeightedQueue.Entry(t), timeoutMS,
TimeUnit.MILLISECONDS);
+ if (!queue.offer(new WeightedQueue.Entry(t), timeoutMS,
TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Failed to acquire request scheduler
slot for '" + key + "'");
}
public Thread poll()