Author: jbellis
Date: Wed Aug  3 18:58:21 2011
New Revision: 1153610

URL: http://svn.apache.org/viewvc?rev=1153610&view=rev
Log:
add scheduler JMX metrics
patch by stuhood; reviewed by Nirmal Ranganathan for CASSANDRA-2962

Modified:
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1153610&r1=1153609&r2=1153610&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug  3 18:58:21 2011
@@ -25,6 +25,7 @@
  * add paging to get_count (CASSANDRA-2894)
  * fix "short reads" in [multi]get (CASSANDRA-2643)
  * add optional compression for sstables (CASSANDRA-47)
+ * add scheduler JMX metrics (CASSANDRA-2962)
 
 
 0.8.3

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1153610&r1=1153609&r2=1153610&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 Wed Aug  3 18:58:21 2011
@@ -23,13 +23,11 @@ package org.apache.cassandra.scheduler;
 
 import java.util.Map;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.SynchronousQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.RequestSchedulerOptions;
-import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -42,16 +40,17 @@ public class RoundRobinScheduler impleme
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RoundRobinScheduler.class);
 
-    //The Pair is the weighted queue - the left is the weight and the right is 
the queue
-    private final NonBlockingHashMap<String, Pair<Integer, 
SynchronousQueue<Thread>>> queues;
+    //Map of queue id to weighted queue
+    private final NonBlockingHashMap<String, WeightedQueue> queues;
     private static boolean started = false;
 
     private final Semaphore taskCount;
 
+    // Tracks the count of threads available in all queues
     // Used by the the scheduler thread so we don't need to busy-wait until 
there is a request to process
     private final Semaphore queueSize = new Semaphore(0, false);
 
-    private Integer defaultWeight;
+    private int defaultWeight;
     private Map<String, Integer> weights;
 
     public RoundRobinScheduler(RequestSchedulerOptions options)
@@ -62,7 +61,7 @@ public class RoundRobinScheduler impleme
         weights = options.weights;
 
         taskCount = new Semaphore(options.throttle_limit);
-        queues = new NonBlockingHashMap<String, Pair<Integer, 
SynchronousQueue<Thread>>>();
+        queues = new NonBlockingHashMap<String, WeightedQueue>();
         Runnable runnable = new Runnable()
         {
             public void run()
@@ -75,18 +74,18 @@ public class RoundRobinScheduler impleme
         };
         Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
         scheduler.start();
-        logger.info("Started the RoundRobin Request Scheduler");
         started = true;
+        logger.info("Started the RoundRobin Request Scheduler");
     }
 
     public void queue(Thread t, String id)
     {
-        Pair<Integer, SynchronousQueue<Thread>> weightedQueue = 
getWeightedQueue(id);
+        WeightedQueue weightedQueue = getWeightedQueue(id);
 
         try
         {
             queueSize.release();
-            weightedQueue.right.put(t);
+            weightedQueue.put(t);
         }
         catch (InterruptedException e)
         {
@@ -101,19 +100,14 @@ public class RoundRobinScheduler impleme
 
     private void schedule()
     {
-        int weight;
-        SynchronousQueue<Thread> queue;
-        Thread t;
-
         queueSize.acquireUninterruptibly();
-        for (Map.Entry<String,Pair<Integer, SynchronousQueue<Thread>>> request 
: queues.entrySet())
+        for (Map.Entry<String,WeightedQueue> request : queues.entrySet())
         {
-            weight = request.getValue().left;
-            queue = request.getValue().right;
+            WeightedQueue queue = request.getValue();
             //Using the weight, process that many requests at a time (for that 
scheduler id)
-            for (int i=0; i<weight; i++)
+            for (int i=0; i<queue.weight; i++)
             {
-                t = queue.poll();
+                Thread t = queue.poll();
                 if (t == null)
                     break;
                 else
@@ -130,18 +124,21 @@ public class RoundRobinScheduler impleme
      * Get the Queue for the respective id, if one is not available 
      * create a new queue for that corresponding id and return it
      */
-    private Pair<Integer, SynchronousQueue<Thread>> getWeightedQueue(String id)
+    private WeightedQueue getWeightedQueue(String id)
     {
-        Pair<Integer, SynchronousQueue<Thread>> weightedQueue = queues.get(id);
+        WeightedQueue weightedQueue = queues.get(id);
         if (weightedQueue != null)
             // queue existed
             return weightedQueue;
 
-        Pair<Integer, SynchronousQueue<Thread>> maybenew = new 
Pair(getWeight(id), new SynchronousQueue<Thread>(true));
+        WeightedQueue maybenew = new WeightedQueue(id, getWeight(id));
         weightedQueue = queues.putIfAbsent(id, maybenew);
         if (weightedQueue == null)
-            // created new queue
+        {
+            // created new queue: register for monitoring
+            maybenew.register();
             return maybenew;
+        }
 
         // another thread created the queue
         return weightedQueue;


Reply via email to