Author: jbellis
Date: Thu Sep 23 21:42:01 2010
New Revision: 1000640
URL: http://svn.apache.org/viewvc?rev=1000640&view=rev
Log:
Add weighted request scheduler. patch by Jeremy Hanna; reviewed by Stu Hood
for CASSANDRA-1485
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Sep 23 21:42:01 2010
@@ -86,6 +86,7 @@
* remove Clock from the Thrift (and Avro) API (CASSANDRA-1501)
* Close intra-node sockets when connection is broken (CASSANDRA-1528)
* RPM packaging spec file (CASSANDRA-786)
+ * weighted request scheduler (CASSANDRA-1485)
0.7-beta1
Modified: cassandra/trunk/conf/cassandra.yaml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Sep 23 21:42:01 2010
@@ -210,9 +210,9 @@ dynamic_snitch: true
# not affect inter node communication.
# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
-# client requests to a node with a sepearte queue for each
-# reques_scheduler_id. The requests are throttled based on the limit set
-# in throttle_limit in the requeset_scheduler_options
+# client requests to a node with a separate queue for each
+# request_scheduler_id. The scheduler is further customized by
+# request_scheduler_options as described below.
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
# Scheduler Options vary based on the type of scheduler
@@ -224,12 +224,23 @@ request_scheduler: org.apache.cassandra.
# running requests can complete.
# The value of 80 here is twice the number of
# concurrent_reads + concurrent_writes.
+# - default_weight -- default_weight is optional and allows for
+# overriding the default which is 1.
+# - weights -- Weights are optional and will default to 1 or the
+# overridden default_weight. The weight translates into how
+# many requests are handled during each turn of the
+# RoundRobin, based on the scheduler id.
+#
# request_scheduler_options:
# throttle_limit: 80
+# default_weight: 5
+# weights:
+# Keyspace1: 1
+# Keyspace2: 5
# request_scheduler_id -- An identifer based on which to perform
-# the request scheduling. The current supported option is "keyspace"
-request_scheduler_id: keyspace
+# the request scheduling. Currently the only valid option is keyspace.
+# request_scheduler_id: keyspace
# A ColumnFamily is the Cassandra concept closest to a relational table.
#
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu
Sep 23 21:42:01 2010
@@ -822,7 +822,7 @@ public class CassandraServer implements
*/
private void schedule()
{
- requestScheduler.queue(Thread.currentThread(),
clientState.getSchedulingId());
+ requestScheduler.queue(Thread.currentThread(),
clientState.getSchedulingValue());
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/RequestSchedulerOptions.java
Thu Sep 23 21:42:01 2010
@@ -1,4 +1,6 @@
package org.apache.cassandra.config;
+
+import java.util.Map;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -25,5 +27,10 @@ package org.apache.cassandra.config;
*/
public class RequestSchedulerOptions
{
- public Integer throttle_limit = 80;
+ public static final Integer DEFAULT_THROTTLE_LIMIT = 80;
+ public static final Integer DEFAULT_WEIGHT = 1;
+
+ public Integer throttle_limit = DEFAULT_THROTTLE_LIMIT;
+ public Integer default_weight = DEFAULT_WEIGHT;
+ public Map<String, Integer> weights;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
Thu Sep 23 21:42:01 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import org.apache.cassandra.config.RequestSchedulerOptions;
+import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +34,15 @@ import org.slf4j.LoggerFactory;
/**
* A very basic Round Robin implementation of the RequestScheduler. It handles
* request groups identified on user/keyspace by placing them in separate
- * queues and servicing a request from each queue in a RoundRobin fashion.
+ * queues and servicing a request from each queue in a RoundRobin fashion.
+ * It optionally adds weights for each round.
*/
public class RoundRobinScheduler implements IRequestScheduler
{
private static final Logger logger =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private final NonBlockingHashMap<String, SynchronousQueue<Thread>> queues;
+
+ //The Pair is the weighted queue - the left is the weight and the right is
the queue
+ private final NonBlockingHashMap<String, Pair<Integer,
SynchronousQueue<Thread>>> queues;
private static boolean started = false;
private final Semaphore taskCount;
@@ -46,12 +50,18 @@ public class RoundRobinScheduler impleme
// Used by the the scheduler thread so we don't need to busy-wait until
there is a request to process
private final Semaphore queueSize = new Semaphore(0, false);
+ private Integer defaultWeight;
+ private Map<String, Integer> weights;
+
public RoundRobinScheduler(RequestSchedulerOptions options)
{
assert !started;
+ defaultWeight = options.default_weight;
+ weights = options.weights;
+
taskCount = new Semaphore(options.throttle_limit);
- queues = new NonBlockingHashMap<String, SynchronousQueue<Thread>>();
+ queues = new NonBlockingHashMap<String, Pair<Integer,
SynchronousQueue<Thread>>>();
Runnable runnable = new Runnable()
{
public void run()
@@ -70,12 +80,12 @@ public class RoundRobinScheduler impleme
public void queue(Thread t, String id)
{
- SynchronousQueue<Thread> queue = getQueue(id);
+ Pair<Integer, SynchronousQueue<Thread>> weightedQueue =
getWeightedQueue(id);
try
{
queueSize.release();
- queue.put(t);
+ weightedQueue.right.put(t);
}
catch (InterruptedException e)
{
@@ -90,14 +100,26 @@ public class RoundRobinScheduler impleme
private void schedule()
{
+ int weight;
+ SynchronousQueue<Thread> queue;
+ Thread t;
+
queueSize.acquireUninterruptibly();
- for (SynchronousQueue<Thread> queue : queues.values())
+ for (Map.Entry<String,Pair<Integer, SynchronousQueue<Thread>>> request
: queues.entrySet())
{
- Thread t = queue.poll();
- if (t != null)
+ weight = request.getValue().left;
+ queue = request.getValue().right;
+ //Using the weight, process that many requests at a time (for that
scheduler id)
+ for (int i=0; i<weight; i++)
{
- taskCount.acquireUninterruptibly();
- queueSize.acquireUninterruptibly();
+ t = queue.poll();
+ if (t == null)
+ break;
+ else
+ {
+ taskCount.acquireUninterruptibly();
+ queueSize.acquireUninterruptibly();
+ }
}
}
queueSize.release();
@@ -107,25 +129,32 @@ public class RoundRobinScheduler impleme
* Get the Queue for the respective id, if one is not available
* create a new queue for that corresponding id and return it
*/
- private SynchronousQueue<Thread> getQueue(String id)
+ private Pair<Integer, SynchronousQueue<Thread>> getWeightedQueue(String id)
{
- SynchronousQueue<Thread> queue = queues.get(id);
- if (queue != null)
+ Pair<Integer, SynchronousQueue<Thread>> weightedQueue = queues.get(id);
+ if (weightedQueue != null)
// queue existed
- return queue;
+ return weightedQueue;
- SynchronousQueue<Thread> maybenew = new SynchronousQueue<Thread>(true);
- queue = queues.putIfAbsent(id, maybenew);
- if (queue == null)
+ Pair<Integer, SynchronousQueue<Thread>> maybenew = new
Pair(getWeight(id), new SynchronousQueue<Thread>(true));
+ weightedQueue = queues.putIfAbsent(id, maybenew);
+ if (weightedQueue == null)
// created new queue
return maybenew;
// another thread created the queue
- return queue;
+ return weightedQueue;
}
Semaphore getTaskCount()
{
return taskCount;
}
+
+ private int getWeight(String weightingVar)
+ {
+ return (weights != null && weights.containsKey(weightingVar))
+ ? weights.get(weightingVar)
+ : defaultWeight;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java Thu
Sep 23 21:42:01 2010
@@ -38,9 +38,6 @@ import org.apache.cassandra.thrift.Inval
public class ClientState
{
private static Logger logger = LoggerFactory.getLogger(ClientState.class);
-
- // true if the keyspace should be used as the scheduling id
- private final boolean SCHEDULE_ON_KEYSPACE =
DatabaseDescriptor.getRequestSchedulerId().equals(RequestSchedulerId.keyspace);
// Current user for the session
private final ThreadLocal<AuthenticatedUser> user = new
ThreadLocal<AuthenticatedUser>()
@@ -80,11 +77,16 @@ public class ClientState
updateKeyspaceAccess();
}
- public String getSchedulingId()
+ public String getSchedulingValue()
{
- if (SCHEDULE_ON_KEYSPACE)
- return keyspace.get();
- return "default";
+ String schedulingValue = "default";
+ switch(DatabaseDescriptor.getRequestSchedulerId())
+ {
+ case keyspace:
+ schedulingValue = keyspace.get();
+ break;
+ }
+ return schedulingValue;
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1000640&r1=1000639&r2=1000640&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Thu Sep 23 21:42:01 2010
@@ -615,7 +615,7 @@ public class CassandraServer implements
*/
private void schedule()
{
- requestScheduler.queue(Thread.currentThread(),
clientState.getSchedulingId());
+ requestScheduler.queue(Thread.currentThread(),
clientState.getSchedulingValue());
}
/**