Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 ef6fa3733 -> 6bbfb5574
cassandra-stress prints per operation statistics, plus misc other stress updates patch by anthony cozzie, reviewed by benedict for CASSANDRA-8769 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bbfb557 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bbfb557 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bbfb557 Branch: refs/heads/cassandra-2.1 Commit: 6bbfb5574e0487d490eb5872c70853c4c56d2940 Parents: ef6fa37 Author: Benedict Elliott Smith <[email protected]> Authored: Tue Mar 10 14:19:23 2015 +0000 Committer: Benedict Elliott Smith <[email protected]> Committed: Tue Mar 10 14:19:23 2015 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/stress/Operation.java | 2 +- .../apache/cassandra/stress/StressAction.java | 46 +++++---- .../apache/cassandra/stress/StressMetrics.java | 83 ++++++++++----- .../apache/cassandra/stress/StressProfile.java | 15 ++- .../stress/operations/FixedOpDistribution.java | 10 ++ .../stress/operations/OpDistribution.java | 2 + .../operations/OpDistributionFactory.java | 6 +- .../operations/SampledOpDistribution.java | 17 ++++ .../SampledOpDistributionFactory.java | 10 +- .../cassandra/stress/settings/Command.java | 27 +++-- .../cassandra/stress/settings/Option.java | 1 + .../stress/settings/OptionAnyProbabilities.java | 5 + .../stress/settings/OptionDistribution.java | 17 ++-- .../cassandra/stress/settings/OptionMulti.java | 15 ++- .../settings/OptionRatioDistribution.java | 5 + .../stress/settings/OptionReplication.java | 2 +- .../stress/settings/SettingsCommand.java | 48 ++++++--- .../settings/SettingsCommandPreDefined.java | 12 ++- .../stress/settings/SettingsCommandUser.java | 5 + .../org/apache/cassandra/stress/util/Timer.java | 10 +- .../apache/cassandra/stress/util/Timing.java | 75 +++++++++----- .../cassandra/stress/util/TimingInterval.java | 102 +++++++++++++------ 23 files changed, 355 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b2ac1aa..af5206b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.4 + * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769) * Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523) * Use long for key count in cfstats (CASSANDRA-8913) * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index 05045f8..f4ac5ee 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -176,7 +176,7 @@ public abstract class Operation } } - timer.stop(run.partitionCount(), run.rowCount()); + timer.stop(run.partitionCount(), run.rowCount(), !success); if (!success) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 1433742..f906a55 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -30,10 +30,11 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.stress.operations.OpDistribution; import org.apache.cassandra.stress.operations.OpDistributionFactory; +import org.apache.cassandra.stress.settings.SettingsCommand; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; -import org.apache.cassandra.stress.util.Timer; +import org.apache.cassandra.stress.util.TimingInterval; import org.apache.cassandra.transport.SimpleClient; public class StressAction implements Runnable @@ -53,13 +54,14 @@ public class StressAction implements Runnable // creating keyspace and column families settings.maybeCreateKeyspaces(); - // TODO: warmup should operate configurably over op/pk/row, and be of configurable length - if (!settings.command.noWarmup) - warmup(settings.command.getFactory(settings)); - output.println("Sleeping 2s..."); Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); + if (!settings.command.noWarmup) + warmup(settings.command.getFactory(settings)); + if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE) + settings.command.truncateTables(settings); + // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution RateLimiter rateLimiter = null; if (settings.rate.opRateTargetPerSecond > 0) @@ -86,12 +88,19 @@ public class StressAction implements Runnable // warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } ); int iterations = 50000 * settings.node.nodes.size(); + int threads = 20; + + if (settings.rate.maxThreads > 0) + threads = Math.min(threads, settings.rate.maxThreads); + if (settings.rate.threadCount > 0) + threads = Math.min(threads, settings.rate.threadCount); + for (OpDistributionFactory single : operations.each()) { // we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance; // so warm up all the nodes we're speaking to only. output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations)); - run(single, 20, iterations, 0, null, null, warmupOutput); + run(single, threads, iterations, 0, null, null, warmupOutput); } } @@ -109,6 +118,9 @@ public class StressAction implements Runnable { output.println(String.format("Running with %d threadCount", threadCount)); + if (settings.command.truncate == SettingsCommand.TruncateWhen.ALWAYS) + settings.command.truncateTables(settings); + StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count, settings.command.duration, rateLimiter, settings.command.durationUnits, output); if (result == null) @@ -146,7 +158,7 @@ public class StressAction implements Runnable } while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty))); // summarise all results - StressMetrics.summarise(runIds, results, output); + StressMetrics.summarise(runIds, results, output, settings.samples.historyCount); return true; } @@ -187,8 +199,8 @@ public class StressAction implements Runnable final Consumer[] consumers = new Consumer[threadCount]; for (int i = 0; i < threadCount; i++) { - Timer timer = metrics.getTiming().newTimer(settings.samples.liveCount / threadCount); - consumers[i] = new Consumer(operations, done, workManager, timer, metrics, rateLimiter); + consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter, + settings.samples.liveCount / threadCount); } // starting worker threadCount @@ -240,28 +252,27 @@ public class StressAction implements Runnable private final OpDistribution operations; private final StressMetrics metrics; - private final Timer timer; private final RateLimiter rateLimiter; private volatile boolean success = true; private final WorkManager workManager; private final CountDownLatch done; - public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, Timer timer, StressMetrics metrics, RateLimiter rateLimiter) + public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics, + RateLimiter rateLimiter, int sampleCount) { this.done = done; this.rateLimiter = rateLimiter; this.workManager = workManager; this.metrics = metrics; - this.timer = timer; - this.operations = operations.get(timer); + this.operations = operations.get(metrics.getTiming(), sampleCount); } public void run() { - timer.init(); + operations.initTimers(); + try { - SimpleClient sclient = null; ThriftClient tclient = null; JavaDriverClient jclient = null; @@ -324,11 +335,8 @@ public class StressAction implements Runnable finally { done.countDown(); - timer.close(); + operations.closeTimers(); } - } - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java index d1cc0d4..46ca488 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java @@ -24,18 +24,16 @@ package org.apache.cassandra.stress; import java.io.PrintStream; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadFactory; +import org.apache.cassandra.stress.util.*; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.stress.settings.StressSettings; -import org.apache.cassandra.stress.util.JmxCollector; -import org.apache.cassandra.stress.util.Timing; -import org.apache.cassandra.stress.util.TimingInterval; -import org.apache.cassandra.stress.util.Uncertainty; public class StressMetrics { @@ -51,10 +49,12 @@ public class StressMetrics private final Timing timing; private final Callable<JmxCollector.GcStats> gcStatsCollector; private volatile JmxCollector.GcStats totalGcStats; + private final StressSettings settings; public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings) { this.output = output; + this.settings = settings; Callable<JmxCollector.GcStats> gcStatsCollector; totalGcStats = new JmxCollector.GcStats(0); try @@ -93,7 +93,9 @@ public class StressMetrics { try { - long sleep = timing.getHistory().endMillis() + logIntervalMillis - System.currentTimeMillis(); + long sleepNanos = timing.getHistory().endNanos() - System.nanoTime(); + long sleep = (sleepNanos / 1000000) + logIntervalMillis; + if (sleep < logIntervalMillis >>> 3) // if had a major hiccup, sleep full interval Thread.sleep(logIntervalMillis); @@ -154,9 +156,19 @@ public class StressMetrics { Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector); totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra)); - if (result.timing.partitionCount != 0) - printRow("", result.timing, timing.getHistory(), result.extra, rowRateUncertainty, output); - rowRateUncertainty.update(result.timing.adjustedRowRate()); + TimingInterval current = result.intervals.combine(settings.samples.reportCount); + TimingInterval history = timing.getHistory().combine(settings.samples.historyCount); + rowRateUncertainty.update(current.adjustedRowRate()); + if (current.partitionCount != 0) + { + if (result.intervals.intervals().size() > 1) + { + for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet()) + printRow("", type.getKey(), type.getValue(), timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output); + } + + printRow("", "total", current, history, result.extra, rowRateUncertainty, output); + } if (timing.done()) stop = true; } @@ -164,19 +176,19 @@ public class StressMetrics // PRINT FORMATTING - public static final String HEADFORMAT = "%-10s,%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%8s,%8s,%8s,%8s"; - public static final String ROWFORMAT = "%-10d,%10.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f"; + public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s"; + public static final String ROWFORMAT = "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f"; private static void printHeader(String prefix, PrintStream output) { - output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "gc: #", "max ms", "sum ms", "sdv ms", "mb")); + output.println(prefix + String.format(HEADFORMAT, "type,", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb")); } - private static void printRow(String prefix, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output) + private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output) { output.println(prefix + String.format(ROWFORMAT, + type + ",", total.operationCount, - interval.adjustedRowRate(), interval.opRate(), interval.partitionRate(), interval.rowRate(), @@ -188,6 +200,7 @@ public class StressMetrics interval.maxLatency(), total.runTime() / 1000f, opRateUncertainty.getUncertainty(), + interval.errorCount, gcStats.count, gcStats.maxms, gcStats.summs, @@ -200,16 +213,20 @@ public class StressMetrics { output.println("\n"); output.println("Results:"); - TimingInterval history = timing.getHistory(); - output.println(String.format("op rate : %.0f", history.opRate())); - output.println(String.format("partition rate : %.0f", history.partitionRate())); - output.println(String.format("row rate : %.0f", history.rowRate())); - output.println(String.format("latency mean : %.1f", history.meanLatency())); - output.println(String.format("latency median : %.1f", history.medianLatency())); - output.println(String.format("latency 95th percentile : %.1f", history.rankLatency(.95f))); - output.println(String.format("latency 99th percentile : %.1f", history.rankLatency(0.99f))); - output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f))); - output.println(String.format("latency max : %.1f", history.maxLatency())); + + TimingIntervals opHistory = timing.getHistory(); + TimingInterval history = opHistory.combine(settings.samples.historyCount); + output.println(String.format("op rate : %.0f %s", history.opRate(), opHistory.opRates())); + output.println(String.format("partition rate : %.0f %s", history.partitionRate(), opHistory.partitionRates())); + output.println(String.format("row rate : %.0f %s", history.rowRate(), opHistory.rowRates())); + output.println(String.format("latency mean : %.1f %s", history.meanLatency(), opHistory.meanLatencies())); + output.println(String.format("latency median : %.1f %s", history.medianLatency(), opHistory.medianLatencies())); + output.println(String.format("latency 95th percentile : %.1f %s", history.rankLatency(.95f), opHistory.rankLatencies(0.95f))); + output.println(String.format("latency 99th percentile : %.1f %s", history.rankLatency(0.99f), opHistory.rankLatencies(0.99f))); + output.println(String.format("latency 99.9th percentile : %.1f %s", history.rankLatency(0.999f), opHistory.rankLatencies(0.999f))); + output.println(String.format("latency max : %.1f %s", history.maxLatency(), opHistory.maxLatencies())); + output.println(String.format("Total partitions : %d %s", history.partitionCount, opHistory.partitionCounts())); + output.println(String.format("Total errors : %d %s", history.errorCount, opHistory.errorCounts())); output.println(String.format("total gc count : %.0f", totalGcStats.count)); output.println(String.format("total gc mb : %.0f", totalGcStats.bytes / (1 << 20))); output.println(String.format("total gc time (s) : %.0f", totalGcStats.summs / 1000)); @@ -219,7 +236,7 @@ public class StressMetrics history.runTime(), "HH:mm:ss", true)); } - public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out) + public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out, int historySampleCount) { int idLen = 0; for (String id : ids) @@ -227,13 +244,27 @@ public class StressMetrics String formatstr = "%" + idLen + "s, "; printHeader(String.format(formatstr, "id"), out); for (int i = 0 ; i < ids.size() ; i++) + { + for (Map.Entry<String, TimingInterval> type : summarise.get(i).timing.getHistory().intervals().entrySet()) + { + printRow(String.format(formatstr, ids.get(i)), + type.getKey(), + type.getValue(), + type.getValue(), + summarise.get(i).totalGcStats, + summarise.get(i).rowRateUncertainty, + out); + } + TimingInterval hist = summarise.get(i).timing.getHistory().combine(historySampleCount); printRow(String.format(formatstr, ids.get(i)), - summarise.get(i).timing.getHistory(), - summarise.get(i).timing.getHistory(), + "total", + hist, + hist, summarise.get(i).totalGcStats, summarise.get(i).rowRateUncertainty, out ); + } } public Timing getTiming() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 687b3ae..6c73214 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -43,10 +43,7 @@ import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.generate.values.*; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.operations.userdefined.SchemaQuery; -import org.apache.cassandra.stress.settings.OptionDistribution; -import org.apache.cassandra.stress.settings.OptionRatioDistribution; -import org.apache.cassandra.stress.settings.StressSettings; -import org.apache.cassandra.stress.settings.ValidationType; +import org.apache.cassandra.stress.settings.*; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; @@ -188,6 +185,16 @@ public class StressProfile implements Serializable maybeLoadSchemaInfo(settings); } + public void truncateTable(StressSettings settings) + { + JavaDriverClient client = settings.getJavaDriverClient(false); + assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER; + String cql = String.format("TRUNCATE %s.%s", keyspaceName, tableName); + client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE); + System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.", + keyspaceName, tableName, settings.node.nodes.size())); + Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS); + } private void maybeLoadSchemaInfo(StressSettings settings) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java index 533b630..f2616cf 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java @@ -36,4 +36,14 @@ public class FixedOpDistribution implements OpDistribution { return operation; } + + public void initTimers() + { + operation.timer.init(); + } + + public void closeTimers() + { + operation.timer.close(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java index 0fc15a6..e09300a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java @@ -28,4 +28,6 @@ public interface OpDistribution Operation next(); + public void initTimers(); + public void closeTimers(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java index afbae7d..7e13fcd 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java @@ -21,13 +21,11 @@ package org.apache.cassandra.stress.operations; */ -import org.apache.cassandra.stress.util.Timer; +import org.apache.cassandra.stress.util.Timing; public interface OpDistributionFactory { - - public OpDistribution get(Timer timer); + public OpDistribution get(Timing timing, int sampleCount); public String desc(); Iterable<OpDistributionFactory> each(); - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java index 432e991..9698421 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java @@ -25,6 +25,7 @@ import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.Distribution; +import org.apache.commons.math3.util.Pair; public class SampledOpDistribution implements OpDistribution { @@ -50,4 +51,20 @@ public class SampledOpDistribution implements OpDistribution remaining--; return cur; } + + public void initTimers() + { + for (Pair<Operation, Double> op : operations.getPmf()) + { + op.getFirst().timer.init(); + } + } + + public void closeTimers() + { + for (Pair<Operation, Double> op : operations.getPmf()) + { + op.getFirst().timer.close(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java index 9713e93..10191a6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.cassandra.stress.util.Timing; import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.commons.math3.util.Pair; @@ -49,12 +50,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF protected abstract Operation get(Timer timer, PartitionGenerator generator, T key); protected abstract PartitionGenerator newGenerator(); - public OpDistribution get(Timer timer) + public OpDistribution get(Timing timing, int sampleCount) { PartitionGenerator generator = newGenerator(); List<Pair<Operation, Double>> operations = new ArrayList<>(); for (Map.Entry<T, Double> ratio : ratios.entrySet()) - operations.add(new Pair<>(get(timer, generator, ratio.getKey()), ratio.getValue())); + operations.add(new Pair<>(get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey()), + ratio.getValue())); return new SampledOpDistribution(new EnumeratedDistribution<>(operations), clustering.get()); } @@ -73,9 +75,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF { out.add(new OpDistributionFactory() { - public OpDistribution get(Timer timer) + public OpDistribution get(Timing timing, int sampleCount) { - return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timer, newGenerator(), ratio.getKey())); + return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey())); } public String desc() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Command.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java index 9a93e34..c47c5d2 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java @@ -31,38 +31,37 @@ import com.google.common.collect.ImmutableList; public enum Command { - READ(false, "standard1", "Super1", + READ(false, "standard1", "Multiple concurrent reads - the cluster must first be populated by a write test", CommandCategory.BASIC ), - WRITE(true, "standard1", "Super1", + WRITE(true, "standard1", "insert", "Multiple concurrent writes against the cluster", CommandCategory.BASIC ), - MIXED(true, null, null, + MIXED(true, null, "Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test", CommandCategory.MIXED ), - COUNTER_WRITE(true, "counter1", "SuperCounter1", + COUNTER_WRITE(true, "counter1", "counter_add", "Multiple concurrent updates of counters.", CommandCategory.BASIC ), - COUNTER_READ(false, "counter1", "SuperCounter1", + COUNTER_READ(false, "counter1", "counter_get", "Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.", CommandCategory.BASIC ), - USER(true, null, null, + USER(true, null, "Interleaving of user provided queries, with configurable ratio and distribution", CommandCategory.USER ), - HELP(false, null, null, "-?", "Print help for a command or option", null), - PRINT(false, null, null, "Inspect the output of a distribution definition", null), - LEGACY(false, null, null, "Legacy support mode", null) - + HELP(false, null, "-?", "Print help for a command or option", null), + PRINT(false, null, "Inspect the output of a distribution definition", null), + LEGACY(false, null, "Legacy support mode", null) ; private static final Map<String, Command> LOOKUP; @@ -87,17 +86,15 @@ public enum Command public final List<String> names; public final String description; public final String table; - public final String supertable; - Command(boolean updates, String table, String supertable, String description, CommandCategory category) + Command(boolean updates, String table, String description, CommandCategory category) { - this(updates, table, supertable, null, description, category); + this(updates, table, null, description, category); } - Command(boolean updates, String table, String supertable, String extra, String description, CommandCategory category) + Command(boolean updates, String table, String extra, String description, CommandCategory category) { this.table = table; - this.supertable = supertable; this.updates = updates; this.category = category; List<String> names = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Option.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java index a9e669c..b9e402e 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java @@ -32,6 +32,7 @@ abstract class Option abstract String longDisplay(); abstract List<String> multiLineDisplay(); abstract boolean setByUser(); + abstract boolean present(); public int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java index 9c2f367..28d8f96 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java @@ -73,6 +73,11 @@ public final class OptionAnyProbabilities extends OptionMulti { return !options.isEmpty(); } + + boolean present() + { + return setByUser(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java index 45e832a..7186efb 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java @@ -134,6 +134,11 @@ public class OptionDistribution extends Option return spec != null; } + boolean present() + { + return setByUser() || defaultSpec != null; + } + @Override public String shortDisplay() { @@ -209,7 +214,7 @@ public class OptionDistribution extends Option stdev = ((max - min) / 2d) / stdevsToEdge; } return new GaussianFactory(min, max, mean, stdev); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params); } @@ -233,7 +238,7 @@ public class OptionDistribution extends Option // over entire range, but this results in overly skewed distribution, so take sqrt final double mean = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min))); return new ExpFactory(min, max, mean); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params); } @@ -258,7 +263,7 @@ public class OptionDistribution extends Option // over entire range, but this results in overly skewed distribution, so take sqrt final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min))); return new ExtremeFactory(min, max, shape, scale); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for extreme (Weibull) distribution: " + params); } @@ -284,7 +289,7 @@ public class OptionDistribution extends Option // over entire range, but this results in overly skewed distribution, so take sqrt final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min))); return new QuantizedExtremeFactory(min, max, shape, scale, quantas); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for quantized extreme (Weibull) distribution: " + params); } @@ -305,7 +310,7 @@ public class OptionDistribution extends Option final long min = parseLong(bounds[0]); final long max = parseLong(bounds[1]); return new UniformFactory(min, max); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params); } @@ -324,7 +329,7 @@ public class OptionDistribution extends Option { final long key = parseLong(params.get(0)); return new FixedFactory(key); - } catch (Exception _) + } catch (Exception ignore) { throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java index 6d11012..ad89a5b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java @@ -182,6 +182,11 @@ abstract class OptionMulti extends Option { return !options.isEmpty(); } + + boolean present() + { + return !options.isEmpty(); + } } List<Option> optionsSetByUser() @@ -197,7 +202,7 @@ abstract class OptionMulti extends Option { List<Option> r = new ArrayList<>(); for (Option option : delegate.options()) - if (!option.setByUser() && option.happy()) + if (!option.setByUser() && option.present()) r.add(option); return r; } @@ -210,4 +215,12 @@ abstract class OptionMulti extends Option return false; } + boolean present() + { + for (Option option : delegate.options()) + if (option.present()) + return true; + return false; + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java index 416f045..67cd1a9 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java @@ -124,6 +124,11 @@ public class OptionRatioDistribution extends Option return delegate.setByUser(); } + boolean present() + { + return delegate.present(); + } + @Override public String shortDisplay() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java index 8b65587..8d1a6fc 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java @@ -81,7 +81,7 @@ class OptionReplication extends OptionMulti throw new IllegalArgumentException(clazz + " is not a replication strategy"); strategy = fullname; break; - } catch (Exception _) + } catch (Exception ignore) { } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java index e8b45ec..60b4c09 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java @@ -27,18 +27,27 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.Uninterruptibles; + import org.apache.cassandra.stress.operations.OpDistributionFactory; +import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.thrift.ConsistencyLevel; // Generic command settings - common to read/write/etc public abstract class SettingsCommand implements Serializable { + public static enum TruncateWhen + { + NEVER, ONCE, ALWAYS + } + public final Command type; public final long count; public final long duration; public final TimeUnit durationUnits; public final boolean noWarmup; + public final TruncateWhen truncate; public final ConsistencyLevel consistencyLevel; public final double targetUncertainty; public final int minimumUncertaintyMeasurements; @@ -60,6 +69,8 @@ public abstract class SettingsCommand implements Serializable this.type = type; this.consistencyLevel = ConsistencyLevel.valueOf(options.consistencyLevel.value().toUpperCase()); this.noWarmup = options.noWarmup.setByUser(); + this.truncate = TruncateWhen.valueOf(options.truncate.value().toUpperCase()); + if (count != null) { this.count = OptionDistribution.parseLong(count.count.value()); @@ -107,7 +118,8 @@ public abstract class SettingsCommand implements Serializable static abstract class Options extends GroupedOptions { final OptionSimple noWarmup = new OptionSimple("no-warmup", "", null, "Do not warmup the process", false); - final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|SERIAL|LOCAL_SERIAL|LOCAL_ONE", "ONE", "Consistency level to use", false); + final OptionSimple truncate = new OptionSimple("truncate=", "never|once|always", "never", "Truncate the table: never, before performing any work, or before each iteration", false); + final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY", "ONE", "Consistency level to use", false); } static class Count extends Options @@ -116,7 +128,7 @@ public abstract class SettingsCommand implements Serializable @Override public List<? extends Option> options() { - return Arrays.asList(count, noWarmup, consistencyLevel); + return Arrays.asList(count, noWarmup, truncate, consistencyLevel); } } @@ -126,7 +138,7 @@ public abstract class SettingsCommand implements Serializable @Override public List<? extends Option> options() { - return Arrays.asList(duration, noWarmup, consistencyLevel); + return Arrays.asList(duration, noWarmup, truncate, consistencyLevel); } } @@ -138,10 +150,26 @@ public abstract class SettingsCommand implements Serializable @Override public List<? extends Option> options() { - return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, consistencyLevel); + return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, truncate, consistencyLevel); } } + public abstract void truncateTables(StressSettings settings); + + protected void truncateTables(StressSettings settings, String ks, String ... tables) + { + JavaDriverClient client = settings.getJavaDriverClient(false); + assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER; + for (String table : tables) + { + String cql = String.format("TRUNCATE %s.%s", ks, table); + client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE); + } + System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.", + ks, Arrays.toString(tables), settings.node.nodes.size())); + Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS); + } + // CLI Utility Methods static SettingsCommand get(Map<String, String[]> clArgs) @@ -171,18 +199,6 @@ public abstract class SettingsCommand implements Serializable return null; } -/* static SettingsCommand build(Command type, String[] params) - { - GroupedOptions options = GroupedOptions.select(params, new Count(), new Duration(), new Uncertainty()); - if (options == null) - { - printHelp(type); - System.out.println("Invalid " + type + " options provided, see output for valid options"); - System.exit(1); - } - return new SettingsCommand(type, options); - }*/ - static void printHelp(Command type) { printHelp(type.toString().toLowerCase()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java index ee1958b..83f444c 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java @@ -37,7 +37,7 @@ import org.apache.cassandra.stress.operations.FixedOpDistribution; import org.apache.cassandra.stress.operations.OpDistribution; import org.apache.cassandra.stress.operations.OpDistributionFactory; import org.apache.cassandra.stress.operations.predefined.PredefinedOperation; -import org.apache.cassandra.stress.util.Timer; +import org.apache.cassandra.stress.util.Timing; // Settings unique to the mixed command type public class SettingsCommandPreDefined extends SettingsCommand @@ -51,9 +51,10 @@ public class SettingsCommandPreDefined extends SettingsCommand final SeedManager seeds = new SeedManager(settings); return new OpDistributionFactory() { - public OpDistribution get(Timer timer) + public OpDistribution get(Timing timing, int sampleCount) { - return new FixedOpDistribution(PredefinedOperation.operation(type, timer, newGenerator(settings), seeds, settings, add)); + return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount), + newGenerator(settings), seeds, settings, add)); } public String desc() @@ -108,6 +109,11 @@ public class SettingsCommandPreDefined extends SettingsCommand } + public void truncateTables(StressSettings settings) + { + truncateTables(settings, settings.schema.keyspace, "standard1", "counter1", "counter3"); + } + // CLI utility methods public static SettingsCommandPreDefined build(Command type, String[] params) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java index d5b221c..d4e43cf 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java @@ -89,6 +89,11 @@ public class SettingsCommandUser extends SettingsCommand }; } + public void truncateTables(StressSettings settings) + { + profile.truncateTable(settings); + } + static final class Options extends GroupedOptions { final SettingsCommand.Options parent; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java index ff625a8..88e8020 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java @@ -39,6 +39,7 @@ public final class Timer private int opCount; // aggregate info + private long errorCount; private long partitionCount; private long rowCount; private long total; @@ -78,7 +79,7 @@ public final class Timer return finalReport == null; } - public void stop(long partitionCount, long rowCount) + public void stop(long partitionCount, long rowCount, boolean error) { maybeReport(); long now = System.nanoTime(); @@ -94,6 +95,8 @@ public final class Timer opCount += 1; this.partitionCount += partitionCount; this.rowCount += rowCount; + if (error) + this.errorCount++; upToDateAsOf = now; } @@ -108,14 +111,15 @@ public final class Timer ( new SampleOfLongs(Arrays.copyOf(sample, index(opCount)), p(opCount)), new SampleOfLongs(Arrays.copyOfRange(sample, index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1) ); - final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount, rowCount, total, opCount, - SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE)); + final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount, + rowCount, total, opCount, errorCount, SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE)); // reset counters opCount = 0; partitionCount = 0; rowCount = 0; total = 0; max = 0; + errorCount = 0; lastSnap = upToDateAsOf; return report; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timing.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java index 9464b19..403bee0 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java @@ -21,9 +21,7 @@ package org.apache.cassandra.stress.util; */ -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -36,9 +34,10 @@ import java.util.concurrent.TimeUnit; // metrics calculated over the interval are accurate public class Timing { - - private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>(); - private volatile TimingInterval history; + // concurrency: this should be ok as the consumers are created serially by StressAction.run / warmup + // Probably the CopyOnWriteArrayList could be changed to an ordinary list as well. + private final Map<String, List<Timer>> timers = new TreeMap<>(); + private volatile TimingIntervals history; private final int historySampleCount; private final int reportSampleCount; private boolean done; @@ -54,22 +53,31 @@ public class Timing public static class TimingResult<E> { public final E extra; - public final TimingInterval timing; - public TimingResult(E extra, TimingInterval timing) + public final TimingIntervals intervals; + public TimingResult(E extra, TimingIntervals intervals) { this.extra = extra; - this.timing = timing; + this.intervals = intervals; } } public <E> TimingResult<E> snap(Callable<E> call) throws InterruptedException { - final Timer[] timers = this.timers.toArray(new Timer[0]); - final CountDownLatch ready = new CountDownLatch(timers.length); - for (int i = 0 ; i < timers.length ; i++) + // Count up total # of timers + int timerCount = 0; + for (List<Timer> timersForOperation : timers.values()) { - final Timer timer = timers[i]; - timer.requestReport(ready); + timerCount += timersForOperation.size(); + } + final CountDownLatch ready = new CountDownLatch(timerCount); + + // request reports + for (List <Timer> timersForOperation : timers.values()) + { + for(Timer timer : timersForOperation) + { + timer.requestReport(ready); + } } E extra; @@ -86,34 +94,48 @@ public class Timing // TODO fail gracefully after timeout if a thread is stuck if (!ready.await(5L, TimeUnit.MINUTES)) + { throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck. Check GC/Heap size"); + } boolean done = true; + // reports have been filled in by timer threadCount, so merge - List<TimingInterval> intervals = new ArrayList<>(); - for (Timer timer : timers) + Map<String, TimingInterval> intervals = new TreeMap<>(); + for (Map.Entry<String, List<Timer>> entry : timers.entrySet()) { - intervals.add(timer.report); - done &= !timer.running(); + List<TimingInterval> operationIntervals = new ArrayList<>(); + for (Timer timer : entry.getValue()) + { + operationIntervals.add(timer.report); + done &= !timer.running(); + } + + intervals.put(entry.getKey(), TimingInterval.merge(operationIntervals, reportSampleCount, + history.get(entry.getKey()).endNanos())); } + TimingIntervals result = new TimingIntervals(intervals); this.done = done; - TimingResult<E> result = new TimingResult<>(extra, TimingInterval.merge(intervals, reportSampleCount, history.endNanos())); - history = TimingInterval.merge(Arrays.asList(result.timing, history), historySampleCount, history.startNanos()); - return result; + history = history.merge(result, historySampleCount, history.startNanos()); + return new TimingResult<>(extra, result); } - // build a new timer and add it to the set of running timers - public Timer newTimer(int sampleCount) + // build a new timer and add it to the set of running timers. + public Timer newTimer(String opType, int sampleCount) { final Timer timer = new Timer(sampleCount); - timers.add(timer); + + if (!timers.containsKey(opType)) + timers.put(opType, new ArrayList<Timer>()); + + timers.get(opType).add(timer); return timer; } public void start() { - history = new TimingInterval(System.nanoTime()); + history = new TimingIntervals(timers.keySet()); } public boolean done() @@ -121,9 +143,8 @@ public class Timing return done; } - public TimingInterval getHistory() + public TimingIntervals getHistory() { return history; } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java index 065ea52..6be71c8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java @@ -20,10 +20,7 @@ package org.apache.cassandra.stress.util; * */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; // represents measurements taken over an interval of time @@ -42,18 +39,28 @@ public final class TimingInterval public final long partitionCount; public final long rowCount; public final long operationCount; + public final long errorCount; final SampleOfLongs sample; + public String toString() + { + return String.format("Start: %d end: %d maxLatency: %d pauseLength: %d pauseStart: %d totalLatency: %d" + + " pCount: %d rcount: %d opCount: %d errors: %d", start, end, maxLatency, pauseLength, + pauseStart, totalLatency, partitionCount, rowCount, operationCount, errorCount); + } + TimingInterval(long time) { start = end = time; maxLatency = totalLatency = 0; - partitionCount = rowCount = operationCount = 0; + partitionCount = rowCount = operationCount = errorCount = 0; pauseStart = pauseLength = 0; sample = new SampleOfLongs(new long[0], 1d); } - TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount, long rowCount, long totalLatency, long operationCount, SampleOfLongs sample) + + TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount, + long rowCount, long totalLatency, long operationCount, long errorCount, SampleOfLongs sample) { this.start = start; this.end = Math.max(end, start); @@ -61,6 +68,7 @@ public final class TimingInterval this.partitionCount = partitionCount; this.rowCount = rowCount; this.totalLatency = totalLatency; + this.errorCount = errorCount; this.operationCount = operationCount; this.pauseStart = pauseStart; this.pauseLength = pauseLength; @@ -68,33 +76,41 @@ public final class TimingInterval } // merge multiple timer intervals together - static TimingInterval merge(List<TimingInterval> intervals, int maxSamples, long start) + static TimingInterval merge(Iterable<TimingInterval> intervals, int maxSamples, long start) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - long operationCount = 0, partitionCount = 0, rowCount = 0; + long operationCount = 0, partitionCount = 0, rowCount = 0, errorCount = 0; long maxLatency = 0, totalLatency = 0; List<SampleOfLongs> latencies = new ArrayList<>(); long end = 0; long pauseStart = 0, pauseEnd = Long.MAX_VALUE; for (TimingInterval interval : intervals) { - end = Math.max(end, interval.end); - operationCount += interval.operationCount; - maxLatency = Math.max(interval.maxLatency, maxLatency); - totalLatency += interval.totalLatency; - partitionCount += interval.partitionCount; - rowCount += interval.rowCount; - latencies.addAll(Arrays.asList(interval.sample)); - if (interval.pauseLength > 0) + if (interval != null) { - pauseStart = Math.max(pauseStart, interval.pauseStart); - pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength); + end = Math.max(end, interval.end); + operationCount += interval.operationCount; + maxLatency = Math.max(interval.maxLatency, maxLatency); + totalLatency += interval.totalLatency; + partitionCount += interval.partitionCount; + rowCount += interval.rowCount; + errorCount += interval.errorCount; + latencies.addAll(Arrays.asList(interval.sample)); + if (interval.pauseLength > 0) + { + pauseStart = Math.max(pauseStart, interval.pauseStart); + pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength); + } } } - if (pauseEnd < pauseStart) + + if (pauseEnd < pauseStart || pauseStart <= 0) + { pauseEnd = pauseStart = 0; - return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount, totalLatency, operationCount, - SampleOfLongs.merge(rnd, latencies, maxSamples)); + } + + return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount, + totalLatency, operationCount, errorCount, SampleOfLongs.merge(rnd, latencies, maxSamples)); } @@ -128,11 +144,6 @@ public final class TimingInterval return maxLatency * 0.000001d; } - public long runTime() - { - return (end - start) / 1000000; - } - public double medianLatency() { return sample.medianLatency(); @@ -144,19 +155,48 @@ public final class TimingInterval return sample.rankLatency(rank); } - public final long endNanos() + public long runTime() { - return end; + return (end - start) / 1000000; } - public final long endMillis() + public final long endNanos() { - return end / 1000000; + return end; } public long startNanos() { return start; } -} + + public static enum TimingParameter + { + OPRATE, ROWRATE, ADJROWRATE, PARTITIONRATE, MEANLATENCY, MAXLATENCY, MEDIANLATENCY, RANKLATENCY, + ERRORCOUNT, PARTITIONCOUNT + } + + String getStringValue(TimingParameter value) + { + return getStringValue(value, Float.NaN); + } + + String getStringValue(TimingParameter value, float rank) + { + switch (value) + { + case OPRATE: return String.format("%.0f", opRate()); + case ROWRATE: return String.format("%.0f", rowRate()); + case ADJROWRATE: return String.format("%.0f", adjustedRowRate()); + case PARTITIONRATE: return String.format("%.0f", partitionRate()); + case MEANLATENCY: return String.format("%.1f", meanLatency()); + case MAXLATENCY: return String.format("%.1f", maxLatency()); + case MEDIANLATENCY: return String.format("%.1f", medianLatency()); + case RANKLATENCY: return String.format("%.1f", rankLatency(rank)); + case ERRORCOUNT: return String.format("%d", errorCount); + case PARTITIONCOUNT: return String.format("%d", partitionCount); + default: throw new IllegalStateException(); + } + } + }
