Author: jbellis
Date: Thu Sep  1 03:14:12 2011
New Revision: 1163898

URL: http://svn.apache.org/viewvc?rev=1163898&view=rev
Log:
Properly throw timeouts, decrement the count of waiters on timeout, fix 
off-by-one in taskCount
patch by Stu Hood; reviewed by Ryan King for CASSANDRA-3096

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep  1 03:14:12 2011
@@ -42,7 +42,7 @@
  * Add "install" command to cassandra.bat (CASSANDRA-292)
  * clean up KSMetadata, CFMetadata from unnecessary
    Thrift<->Avro conversion methods (CASSANDRA-3032)
- * Add timeouts to client request schedulers (CASSANDRA-3079)
+ * Add timeouts to client request schedulers (CASSANDRA-3079, 3096)
  * Cli to use hashes rather than array of hashes for strategy options 
(CASSANDRA-3081)
  * LeveledCompactionStrategy (CASSANDRA-1608, 3085, 3110, 3087)
  * Improvements of the CLI `describe` command (CASSANDRA-2630)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 Thu Sep  1 03:14:12 2011
@@ -43,7 +43,6 @@ public class RoundRobinScheduler impleme
 
     //Map of queue id to weighted queue
     private final NonBlockingHashMap<String, WeightedQueue> queues;
-    private static boolean started = false;
 
     private final Semaphore taskCount;
 
@@ -56,12 +55,12 @@ public class RoundRobinScheduler impleme
 
     public RoundRobinScheduler(RequestSchedulerOptions options)
     {
-        assert !started;
-
         defaultWeight = options.default_weight;
         weights = options.weights;
 
-        taskCount = new Semaphore(options.throttle_limit);
+        // the task count is acquired for the first time _after_ releasing a 
thread, so we pre-decrement
+        taskCount = new Semaphore(options.throttle_limit - 1);
+
         queues = new NonBlockingHashMap<String, WeightedQueue>();
         Runnable runnable = new Runnable()
         {
@@ -75,7 +74,6 @@ public class RoundRobinScheduler impleme
         };
         Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
         scheduler.start();
-        started = true;
         logger.info("Started the RoundRobin Request Scheduler");
     }
 
@@ -86,7 +84,21 @@ public class RoundRobinScheduler impleme
         try
         {
             queueSize.release();
-            weightedQueue.put(t, timeoutMS);
+            try
+            {
+                weightedQueue.put(t, timeoutMS);
+                // the scheduler will release us when a slot is available
+            }
+            catch (TimeoutException e)
+            {
+                queueSize.acquireUninterruptibly();
+                throw e;
+            }
+            catch (InterruptedException e)
+            {
+                queueSize.acquireUninterruptibly();
+                throw e;
+            }
         }
         catch (InterruptedException e)
         {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java?rev=1163898&r1=1163897&r2=1163898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/scheduler/WeightedQueue.java 
Thu Sep  1 03:14:12 2011
@@ -60,7 +60,8 @@ class WeightedQueue implements WeightedQ
 
     public void put(Thread t, long timeoutMS) throws InterruptedException, 
TimeoutException
     {
-        queue.offer(new WeightedQueue.Entry(t), timeoutMS, 
TimeUnit.MILLISECONDS);
+        if (!queue.offer(new WeightedQueue.Entry(t), timeoutMS, 
TimeUnit.MILLISECONDS))
+            throw new TimeoutException("Failed to acquire request scheduler 
slot for '" + key + "'");
     }
 
     public Thread poll()


Reply via email to