Updated Branches: refs/heads/cassandra-1.2 19d2782c7 -> 2f72f8bc5
Add a rate limit option to stress patch by jasobrown, reviewed by driftx for CASSANDRA-5004 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f72f8bc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f72f8bc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f72f8bc Branch: refs/heads/cassandra-1.2 Commit: 2f72f8bc57bb96fa4434fb395373a73b259dcce5 Parents: 19d2782 Author: Jason Brown <[email protected]> Authored: Thu May 30 14:10:11 2013 -0700 Committer: Jason Brown <[email protected]> Committed: Thu May 30 14:13:16 2013 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../src/org/apache/cassandra/stress/Session.java | 12 ++++++++++-- .../org/apache/cassandra/stress/StressAction.java | 10 ++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2b8e01f..9d53d17 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * cqlsh: add custom prompt support (CASSANDRA-5539) * Reuse prepared statements in hot auth queries (CASSANDRA-5594) * cqlsh: add vertical output option (see EXPAND) (CASSANDRA-5597) + * Add a rate limit option to stress (CASSANDRA-5004) Merged from 1.1: * Remove buggy thrift max message length option (CASSANDRA-5529) * Fix NPE in Pig's widerow mode (CASSANDRA-5488) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/tools/stress/src/org/apache/cassandra/stress/Session.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java index d16ee78..d527278 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Session.java +++ b/tools/stress/src/org/apache/cassandra/stress/Session.java @@ -23,10 +23,8 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Histogram; import org.apache.cassandra.cli.transport.FramedTransportFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.EncryptionOptions; @@ -116,6 +114,7 @@ public class Session implements Serializable availableOptions.addOption("alg", SSL_ALGORITHM, true, "SSL: algorithm (default: SunX509)"); availableOptions.addOption("st", SSL_STORE_TYPE, true, "SSL: type of store"); availableOptions.addOption("ciphers", SSL_CIPHER_SUITES, true, "SSL: comma-separated list of encryption suites to use"); + availableOptions.addOption("th", "throttle", true, "Throttle the total number of operations per second to a maximum amount."); } private int numKeys = 1000 * 1000; @@ -141,6 +140,7 @@ public class Session implements Serializable private boolean use_prepared = false; private boolean trace = false; public boolean use_native_protocol = false; + private double maxOpsPerSecond = Double.MAX_VALUE; private final String outFileName; @@ -277,6 +277,9 @@ public class Session implements Serializable if (cmd.hasOption("g")) keysPerCall = Integer.parseInt(cmd.getOptionValue("g")); + if (cmd.hasOption("th")) + maxOpsPerSecond = Double.parseDouble(cmd.getOptionValue("th")); + if (cmd.hasOption("e")) consistencyLevel = ConsistencyLevel.valueOf(cmd.getOptionValue("e").toUpperCase()); @@ -502,6 +505,11 @@ public class Session implements Serializable return threads; } + public double getMaxOpsPerSecond() + { + return maxOpsPerSecond; + } + public float getSkipKeys() { return skipKeys; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f72f8bc/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 60e8cbd..9adf92f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -21,6 +21,7 @@ import java.io.PrintStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; +import com.google.common.util.concurrent.RateLimiter; import com.yammer.metrics.stats.Snapshot; import org.apache.cassandra.stress.operations.*; import org.apache.cassandra.stress.util.CassandraClient; @@ -67,13 +68,14 @@ public class StressAction extends Thread int itemsPerThread = client.getKeysPerThread(); int modulo = client.getNumKeys() % threadCount; + RateLimiter rateLimiter = RateLimiter.create(client.getMaxOpsPerSecond()); // creating required type of the threads for the test for (int i = 0; i < threadCount; i++) { if (i == threadCount - 1) itemsPerThread += modulo; // last one is going to handle N + modulo items - consumers[i] = new Consumer(itemsPerThread); + consumers[i] = new Consumer(itemsPerThread, rateLimiter); } Producer producer = new Producer(); @@ -209,12 +211,14 @@ public class StressAction extends Thread private class Consumer extends Thread { private final int items; + private final RateLimiter rateLimiter; private volatile boolean stop = false; private volatile int returnCode = StressAction.SUCCESS; - public Consumer(int toConsume) + public Consumer(int toConsume, RateLimiter rateLimiter) { items = toConsume; + this.rateLimiter = rateLimiter; } public void run() @@ -230,6 +234,7 @@ public class StressAction extends Thread try { + rateLimiter.acquire(); operations.take().run(connection); // running job } catch (Exception e) @@ -258,6 +263,7 @@ public class StressAction extends Thread try { + rateLimiter.acquire(); operations.take().run(connection); // running job } catch (Exception e)
