Updated Branches:
  refs/heads/cassandra-1.2 19d2782c7 -> 2f72f8bc5

Add a rate limit option to stress
patch by jasobrown, reviewed by driftx for CASSANDRA-5004


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f72f8bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f72f8bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f72f8bc

Branch: refs/heads/cassandra-1.2
Commit: 2f72f8bc57bb96fa4434fb395373a73b259dcce5
Parents: 19d2782
Author: Jason Brown <[email protected]>
Authored: Thu May 30 14:10:11 2013 -0700
Committer: Jason Brown <[email protected]>
Committed: Thu May 30 14:13:16 2013 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../src/org/apache/cassandra/stress/Session.java   |   12 ++++++++++--
 .../org/apache/cassandra/stress/StressAction.java  |   10 ++++++++--
 3 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b8e01f..9d53d17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * cqlsh: add custom prompt support (CASSANDRA-5539)
  * Reuse prepared statements in hot auth queries (CASSANDRA-5594)
  * cqlsh: add vertical output option (see EXPAND) (CASSANDRA-5597)
+ * Add a rate limit option to stress (CASSANDRA-5004)
 Merged from 1.1:
  * Remove buggy thrift max message length option (CASSANDRA-5529)
  * Fix NPE in Pig's widerow mode (CASSANDRA-5488)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/tools/stress/src/org/apache/cassandra/stress/Session.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java 
b/tools/stress/src/org/apache/cassandra/stress/Session.java
index d16ee78..d527278 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Session.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Session.java
@@ -23,10 +23,8 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Histogram;
 import org.apache.cassandra.cli.transport.FramedTransportFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.EncryptionOptions;
@@ -116,6 +114,7 @@ public class Session implements Serializable
         availableOptions.addOption("alg", SSL_ALGORITHM,         true, "SSL: 
algorithm (default: SunX509)");
         availableOptions.addOption("st", SSL_STORE_TYPE,         true, "SSL: 
type of store");
         availableOptions.addOption("ciphers", SSL_CIPHER_SUITES, true, "SSL: 
comma-separated list of encryption suites to use");
+        availableOptions.addOption("th",  "throttle",            true,   
"Throttle the total number of operations per second to a maximum amount.");
     }
 
     private int numKeys          = 1000 * 1000;
@@ -141,6 +140,7 @@ public class Session implements Serializable
     private boolean use_prepared  = false;
     private boolean trace         = false;
     public boolean use_native_protocol = false;
+    private double maxOpsPerSecond = Double.MAX_VALUE;
 
     private final String outFileName;
 
@@ -277,6 +277,9 @@ public class Session implements Serializable
             if (cmd.hasOption("g"))
                 keysPerCall = Integer.parseInt(cmd.getOptionValue("g"));
 
+            if (cmd.hasOption("th"))
+                maxOpsPerSecond = Double.parseDouble(cmd.getOptionValue("th"));
+
             if (cmd.hasOption("e"))
                 consistencyLevel = 
ConsistencyLevel.valueOf(cmd.getOptionValue("e").toUpperCase());
 
@@ -502,6 +505,11 @@ public class Session implements Serializable
         return threads;
     }
 
+    public double getMaxOpsPerSecond()
+    {
+        return maxOpsPerSecond;
+    }
+
     public float getSkipKeys()
     {
         return skipKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 60e8cbd..9adf92f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -21,6 +21,7 @@ import java.io.PrintStream;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 
+import com.google.common.util.concurrent.RateLimiter;
 import com.yammer.metrics.stats.Snapshot;
 import org.apache.cassandra.stress.operations.*;
 import org.apache.cassandra.stress.util.CassandraClient;
@@ -67,13 +68,14 @@ public class StressAction extends Thread
 
         int itemsPerThread = client.getKeysPerThread();
         int modulo = client.getNumKeys() % threadCount;
+        RateLimiter rateLimiter = 
RateLimiter.create(client.getMaxOpsPerSecond());
 
         // creating required type of the threads for the test
         for (int i = 0; i < threadCount; i++) {
             if (i == threadCount - 1)
                 itemsPerThread += modulo; // last one is going to handle N + 
modulo items
 
-            consumers[i] = new Consumer(itemsPerThread);
+            consumers[i] = new Consumer(itemsPerThread, rateLimiter);
         }
 
         Producer producer = new Producer();
@@ -209,12 +211,14 @@ public class StressAction extends Thread
     private class Consumer extends Thread
     {
         private final int items;
+        private final RateLimiter rateLimiter;
         private volatile boolean stop = false;
         private volatile int returnCode = StressAction.SUCCESS;
 
-        public Consumer(int toConsume)
+        public Consumer(int toConsume, RateLimiter rateLimiter)
         {
             items = toConsume;
+            this.rateLimiter = rateLimiter;
         }
 
         public void run()
@@ -230,6 +234,7 @@ public class StressAction extends Thread
 
                     try
                     {
+                        rateLimiter.acquire();
                         operations.take().run(connection); // running job
                     }
                     catch (Exception e)
@@ -258,6 +263,7 @@ public class StressAction extends Thread
 
                     try
                     {
+                        rateLimiter.acquire();
                         operations.take().run(connection); // running job
                     }
                     catch (Exception e)

Reply via email to