Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 6e5dd2329 -> eecc034b6 refs/heads/trunk cfb7aceac -> 776251874
ninja-fix stress: rate limit not honoured for warmup, and 'auto' rate mode optional with thread ranges Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eecc034b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eecc034b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eecc034b Branch: refs/heads/cassandra-2.1 Commit: eecc034b686b4f2998fbc8045cbc4c7a1e4f0902 Parents: 6e5dd23 Author: Benedict Elliott Smith <[email protected]> Authored: Sat Sep 20 08:44:37 2014 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Sat Sep 20 08:44:37 2014 +0100 ---------------------------------------------------------------------- .../apache/cassandra/stress/StressAction.java | 34 +++++++++++--------- .../cassandra/stress/settings/SettingsRate.java | 24 +++++++------- 2 files changed, 31 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/eecc034b/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index da32284..b50637f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -56,19 +56,24 @@ public class StressAction implements Runnable // creating keyspace and column families settings.maybeCreateKeyspaces(); - // TODO: warmup should + // TODO: warmup should operate configurably over op/pk/row, and be of configurable length if (!settings.command.noWarmup) warmup(settings.command.getFactory(settings)); output.println("Sleeping 2s..."); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution + RateLimiter rateLimiter = null; + if (settings.rate.opRateTargetPerSecond > 0) + rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond); + boolean success; - if (settings.rate.auto) - success = runAuto(); + if (settings.rate.minThreads > 0) + success = runMulti(settings.rate.auto, rateLimiter); else success = null != run(settings.command.getFactory(settings), settings.rate.threadCount, settings.command.count, - settings.command.duration, settings.command.durationUnits, output); + settings.command.duration, rateLimiter, settings.command.durationUnits, output); if (success) output.println("END"); @@ -89,16 +94,18 @@ public class StressAction implements Runnable // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance; // so warm up all the nodes we're speaking to only. output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations)); - run(single, 20, iterations, 0, null, warmupOutput); + run(single, 20, iterations, 0, null, null, warmupOutput); } } // TODO : permit varying more than just thread count // TODO : vary thread count based on percentage improvement of previous increment, not by fixed amounts - private boolean runAuto() + private boolean runMulti(boolean auto, RateLimiter rateLimiter) { + if (settings.command.targetUncertainty >= 0) + output.println("WARNING: uncertainty mode (err<) results in uneven workload between thread runs, so should be used for high level analysis only"); int prevThreadCount = -1; - int threadCount = settings.rate.minAutoThreads; + int threadCount = settings.rate.minThreads; List<StressMetrics> results = new ArrayList<>(); List<String> runIds = new ArrayList<>(); do @@ -106,7 +113,7 @@ public class StressAction implements Runnable output.println(String.format("Running with %d threadCount", threadCount)); StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count, - settings.command.duration, settings.command.durationUnits, output); + settings.command.duration, rateLimiter, settings.command.durationUnits, output); if (result == null) return false; results.add(result); @@ -122,7 +129,7 @@ public class StressAction implements Runnable else threadCount *= 1.5; - if (!results.isEmpty() && threadCount > settings.rate.maxAutoThreads) + if (!results.isEmpty() && threadCount > settings.rate.maxThreads) break; if (settings.command.type.updates) @@ -139,7 +146,7 @@ public class StressAction implements Runnable } } // run until we have not improved throughput significantly for previous three runs - } while (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty)); + } while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty))); // summarise all results StressMetrics.summarise(runIds, results, output); @@ -163,7 +170,7 @@ public class StressAction implements Runnable return improvement / count; } - private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, TimeUnit durationUnits, PrintStream output) + private StressMetrics run(OpDistributionFactory operations, int threadCount, long opCount, long duration, RateLimiter rateLimiter, TimeUnit durationUnits, PrintStream output) { output.println(String.format("Running %s with %d threads %s", operations.desc(), @@ -177,11 +184,6 @@ public class StressAction implements Runnable else workManager = new FixedWorkManager(opCount); - RateLimiter rateLimiter = null; - // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution - if (settings.rate.opRateTargetPerSecond > 0) - rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond); - final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings); final CountDownLatch done = new CountDownLatch(threadCount); http://git-wip-us.apache.org/repos/asf/cassandra/blob/eecc034b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java index a91f073..0486678 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsRate.java @@ -30,8 +30,8 @@ public class SettingsRate implements Serializable { public final boolean auto; - public final int minAutoThreads; - public final int maxAutoThreads; + public final int minThreads; + public final int maxThreads; public final int threadCount; public final int opRateTargetPerSecond; @@ -41,15 +41,15 @@ public class SettingsRate implements Serializable threadCount = Integer.parseInt(options.threads.value()); String rateOpt = options.rate.value(); opRateTargetPerSecond = Integer.parseInt(rateOpt.substring(0, rateOpt.length() - 2)); - minAutoThreads = -1; - maxAutoThreads = -1; + minThreads = -1; + maxThreads = -1; } public SettingsRate(AutoOptions auto) { - this.auto = true; - this.minAutoThreads = Integer.parseInt(auto.minThreads.value()); - this.maxAutoThreads = Integer.parseInt(auto.maxThreads.value()); + this.auto = auto.auto.setByUser(); + this.minThreads = Integer.parseInt(auto.minThreads.value()); + this.maxThreads = Integer.parseInt(auto.maxThreads.value()); this.threadCount = -1; this.opRateTargetPerSecond = 0; } @@ -59,14 +59,14 @@ public class SettingsRate implements Serializable private static final class AutoOptions extends GroupedOptions { - final OptionSimple auto = new OptionSimple("auto", "", null, "test with increasing number of threadCount until performance plateaus", false); + final OptionSimple auto = new OptionSimple("auto", "", null, "stop increasing threads once throughput saturates", false); final OptionSimple minThreads = new OptionSimple("threads>=", "[0-9]+", "4", "run at least this many clients concurrently", false); final OptionSimple maxThreads = new OptionSimple("threads<=", "[0-9]+", "1000", "run at most this many clients concurrently", false); @Override public List<? extends Option> options() { - return Arrays.asList(auto, minThreads, maxThreads); + return Arrays.asList(minThreads, maxThreads, auto); } } @@ -96,11 +96,13 @@ public class SettingsRate implements Serializable if (command.count > 0) { ThreadOptions options = new ThreadOptions(); - options.accept("threads=50"); + options.accept("threads=200"); return new SettingsRate(options); } } - return new SettingsRate(new AutoOptions()); + AutoOptions options = new AutoOptions(); + options.accept("auto"); + return new SettingsRate(options); } GroupedOptions options = GroupedOptions.select(params, new AutoOptions(), new ThreadOptions()); if (options == null)
