Author: jbellis
Date: Thu Sep 23 21:42:01 2010
New Revision: 1000640

URL: http://svn.apache.org/viewvc?rev=1000640&view=rev
Log:
Add weighted request scheduler.  patch by Jeremy Hanna; reviewed by Stu Hood 
for CASSANDRA-1485

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
    
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 23 21:42:01 2010
@@ -86,6 +86,7 @@
  * remove Clock from the Thrift (and Avro) API (CASSANDRA-1501)
  * Close intra-node sockets when connection is broken (CASSANDRA-1528)
  * RPM packaging spec file (CASSANDRA-786)
+ * weighted request scheduler (CASSANDRA-1485)
 
 
 0.7-beta1

Modified: cassandra/trunk/conf/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Sep 23 21:42:01 2010
@@ -210,9 +210,9 @@ dynamic_snitch: true
 # not affect inter node communication.
 # org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
 # org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
-# client requests to a node with a sepearte queue for each
-# reques_scheduler_id. The requests are throttled based on the limit set
-# in throttle_limit in the requeset_scheduler_options
+# client requests to a node with a separate queue for each
+# request_scheduler_id. The scheduler is further customized by
+# request_scheduler_options as described below.
 request_scheduler: org.apache.cassandra.scheduler.NoScheduler
 
 # Scheduler Options vary based on the type of scheduler
@@ -224,12 +224,23 @@ request_scheduler: org.apache.cassandra.
 #                      running requests can complete.
 #                      The value of 80 here is twice the number of
 #                      concurrent_reads + concurrent_writes.
+#  - default_weight -- default_weight is optional and allows for
+#                      overriding the default which is 1.
+#  - weights -- Weights are optional and will default to 1 or the
+#               overridden default_weight. The weight translates into how
+#               many requests are handled during each turn of the
+#               RoundRobin, based on the scheduler id.
+#
 # request_scheduler_options:
 #    throttle_limit: 80
+#    default_weight: 5
+#    weights:
+#      Keyspace1: 1
+#      Keyspace2: 5
 
 # request_scheduler_id -- An identifer based on which to perform
-# the request scheduling. The current supported option is "keyspace"
-request_scheduler_id: keyspace
+# the request scheduling. Currently the only valid option is keyspace.
+# request_scheduler_id: keyspace
 
 # A ColumnFamily is the Cassandra concept closest to a relational table. 
 #

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu 
Sep 23 21:42:01 2010
@@ -822,7 +822,7 @@ public class CassandraServer implements 
      */
     private void schedule()
     {
-        requestScheduler.queue(Thread.currentThread(), 
clientState.getSchedulingId());
+        requestScheduler.queue(Thread.currentThread(), 
clientState.getSchedulingValue());
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
 Thu Sep 23 21:42:01 2010
@@ -1,4 +1,6 @@
 package org.apache.cassandra.config;
+
+import java.util.Map;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -25,5 +27,10 @@ package org.apache.cassandra.config;
  */
 public class RequestSchedulerOptions
 {
-    public Integer throttle_limit = 80;
+    public static final Integer DEFAULT_THROTTLE_LIMIT = 80;
+    public static final Integer DEFAULT_WEIGHT = 1;
+
+    public Integer throttle_limit = DEFAULT_THROTTLE_LIMIT;
+    public Integer default_weight = DEFAULT_WEIGHT;
+    public Map<String, Integer> weights;
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
 Thu Sep 23 21:42:01 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.SynchronousQueue;
 
 import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,12 +34,15 @@ import org.slf4j.LoggerFactory;
 /**
  * A very basic Round Robin implementation of the RequestScheduler. It handles 
  * request groups identified on user/keyspace by placing them in separate 
- * queues and servicing a request from each queue in a RoundRobin fashion. 
+ * queues and servicing a request from each queue in a RoundRobin fashion.
+ * It optionally adds weights for each round.
  */
 public class RoundRobinScheduler implements IRequestScheduler
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RoundRobinScheduler.class);
-    private final NonBlockingHashMap<String, SynchronousQueue<Thread>> queues;
+
+    //The Pair is the weighted queue - the left is the weight and the right is 
the queue
+    private final NonBlockingHashMap<String, Pair<Integer, 
SynchronousQueue<Thread>>> queues;
     private static boolean started = false;
 
     private final Semaphore taskCount;
@@ -46,12 +50,18 @@ public class RoundRobinScheduler impleme
     // Used by the the scheduler thread so we don't need to busy-wait until 
there is a request to process
     private final Semaphore queueSize = new Semaphore(0, false);
 
+    private Integer defaultWeight;
+    private Map<String, Integer> weights;
+
     public RoundRobinScheduler(RequestSchedulerOptions options)
     {
         assert !started;
 
+        defaultWeight = options.default_weight;
+        weights = options.weights;
+
         taskCount = new Semaphore(options.throttle_limit);
-        queues = new NonBlockingHashMap<String, SynchronousQueue<Thread>>();
+        queues = new NonBlockingHashMap<String, Pair<Integer, 
SynchronousQueue<Thread>>>();
         Runnable runnable = new Runnable()
         {
             public void run()
@@ -70,12 +80,12 @@ public class RoundRobinScheduler impleme
 
     public void queue(Thread t, String id)
     {
-        SynchronousQueue<Thread> queue = getQueue(id);
+        Pair<Integer, SynchronousQueue<Thread>> weightedQueue = 
getWeightedQueue(id);
 
         try
         {
             queueSize.release();
-            queue.put(t);
+            weightedQueue.right.put(t);
         }
         catch (InterruptedException e)
         {
@@ -90,14 +100,26 @@ public class RoundRobinScheduler impleme
 
     private void schedule()
     {
+        int weight;
+        SynchronousQueue<Thread> queue;
+        Thread t;
+
         queueSize.acquireUninterruptibly();
-        for (SynchronousQueue<Thread> queue : queues.values())
+        for (Map.Entry<String,Pair<Integer, SynchronousQueue<Thread>>> request 
: queues.entrySet())
         {
-            Thread t = queue.poll();
-            if (t != null)
+            weight = request.getValue().left;
+            queue = request.getValue().right;
+            //Using the weight, process that many requests at a time (for that 
scheduler id)
+            for (int i=0; i<weight; i++)
             {
-                taskCount.acquireUninterruptibly();
-                queueSize.acquireUninterruptibly();
+                t = queue.poll();
+                if (t == null)
+                    break;
+                else
+                {
+                    taskCount.acquireUninterruptibly();
+                    queueSize.acquireUninterruptibly();
+                }
             }
         }
         queueSize.release();
@@ -107,25 +129,32 @@ public class RoundRobinScheduler impleme
      * Get the Queue for the respective id, if one is not available 
      * create a new queue for that corresponding id and return it
      */
-    private SynchronousQueue<Thread> getQueue(String id)
+    private Pair<Integer, SynchronousQueue<Thread>> getWeightedQueue(String id)
     {
-        SynchronousQueue<Thread> queue = queues.get(id);
-        if (queue != null)
+        Pair<Integer, SynchronousQueue<Thread>> weightedQueue = queues.get(id);
+        if (weightedQueue != null)
             // queue existed
-            return queue;
+            return weightedQueue;
 
-        SynchronousQueue<Thread> maybenew = new SynchronousQueue<Thread>(true);
-        queue = queues.putIfAbsent(id, maybenew);
-        if (queue == null)
+        Pair<Integer, SynchronousQueue<Thread>> maybenew = new 
Pair(getWeight(id), new SynchronousQueue<Thread>(true));
+        weightedQueue = queues.putIfAbsent(id, maybenew);
+        if (weightedQueue == null)
             // created new queue
             return maybenew;
 
         // another thread created the queue
-        return queue;
+        return weightedQueue;
     }
 
     Semaphore getTaskCount()
     {
         return taskCount;
     }
+
+    private int getWeight(String weightingVar)
+    {
+        return (weights != null && weights.containsKey(weightingVar))
+                ? weights.get(weightingVar)
+                : defaultWeight;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java Thu 
Sep 23 21:42:01 2010
@@ -38,9 +38,6 @@ import org.apache.cassandra.thrift.Inval
 public class ClientState
 {
     private static Logger logger = LoggerFactory.getLogger(ClientState.class);
-    
-    // true if the keyspace should be used as the scheduling id
-    private final boolean SCHEDULE_ON_KEYSPACE = 
DatabaseDescriptor.getRequestSchedulerId().equals(RequestSchedulerId.keyspace);
 
     // Current user for the session
     private final ThreadLocal<AuthenticatedUser> user = new 
ThreadLocal<AuthenticatedUser>()
@@ -80,11 +77,16 @@ public class ClientState
         updateKeyspaceAccess();
     }
 
-    public String getSchedulingId()
+    public String getSchedulingValue()
     {
-        if (SCHEDULE_ON_KEYSPACE)
-            return keyspace.get();
-        return "default";
+        String schedulingValue = "default";
+        switch(DatabaseDescriptor.getRequestSchedulerId())
+        {
+            case keyspace:
+                schedulingValue = keyspace.get();
+                break;
+        }
+        return schedulingValue;
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java 
Thu Sep 23 21:42:01 2010
@@ -615,7 +615,7 @@ public class CassandraServer implements 
      */
     private void schedule()
     {
-        requestScheduler.queue(Thread.currentThread(), 
clientState.getSchedulingId());
+        requestScheduler.queue(Thread.currentThread(), 
clientState.getSchedulingValue());
     }
 
     /**


Reply via email to