Improve stress workload realism patch by benedict; reviewed by tjake for CASSANDRA-7519
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0580fb2b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0580fb2b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0580fb2b Branch: refs/heads/cassandra-2.1.0 Commit: 0580fb2b7707beaa69019a73a6c53d86fe088a0a Parents: c6a2c65 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Sun Sep 7 21:18:53 2014 +0700 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Sun Sep 7 21:19:58 2014 +0700 ---------------------------------------------------------------------- CHANGES.txt | 2 +- tools/cqlstress-counter-example.yaml | 20 +- tools/cqlstress-example.yaml | 25 +- tools/cqlstress-insanity-example.yaml | 20 +- .../org/apache/cassandra/stress/Operation.java | 9 +- .../apache/cassandra/stress/StressAction.java | 169 +++------- .../apache/cassandra/stress/StressMetrics.java | 26 +- .../apache/cassandra/stress/StressProfile.java | 75 ++++- .../org/apache/cassandra/stress/StressYaml.java | 12 +- .../stress/generate/DistributionInverted.java | 7 + .../stress/generate/DistributionQuantized.java | 90 +++++ .../cassandra/stress/generate/FasterRandom.java | 116 +++++++ .../cassandra/stress/generate/Partition.java | 327 +++++++++++++++---- .../stress/generate/PartitionGenerator.java | 28 +- .../stress/generate/RatioDistribution.java | 5 + .../apache/cassandra/stress/generate/Seed.java | 67 ++++ .../stress/generate/SeedGenerator.java | 29 -- .../cassandra/stress/generate/SeedManager.java | 249 ++++++++++++++ .../stress/generate/SeedRandomGenerator.java | 54 --- .../stress/generate/SeedSeriesGenerator.java | 42 --- .../stress/generate/values/Booleans.java | 2 +- .../cassandra/stress/generate/values/Bytes.java | 9 +- .../cassandra/stress/generate/values/Dates.java | 3 +- .../stress/generate/values/Doubles.java | 2 +- .../stress/generate/values/Floats.java | 2 +- .../stress/generate/values/Generator.java | 4 +- .../stress/generate/values/HexBytes.java | 2 +- .../stress/generate/values/HexStrings.java | 4 +- .../cassandra/stress/generate/values/Inets.java | 2 +- .../stress/generate/values/Integers.java | 2 +- .../cassandra/stress/generate/values/Lists.java | 2 +- .../cassandra/stress/generate/values/Longs.java | 2 +- .../cassandra/stress/generate/values/Sets.java | 2 +- .../stress/generate/values/Strings.java | 12 +- .../stress/generate/values/TimeUUIDs.java | 2 +- .../cassandra/stress/generate/values/UUIDs.java | 2 +- .../operations/predefined/CqlCounterAdder.java | 5 + .../operations/predefined/CqlInserter.java | 5 + .../predefined/PredefinedOperation.java | 2 +- .../predefined/ThriftCounterAdder.java | 5 + .../operations/predefined/ThriftInserter.java | 5 + .../operations/userdefined/SchemaInsert.java | 80 +++-- .../operations/userdefined/SchemaQuery.java | 87 ++++- .../operations/userdefined/SchemaStatement.java | 53 +-- .../cassandra/stress/settings/CliOption.java | 3 +- .../stress/settings/OptionDistribution.java | 72 +++- .../settings/OptionRatioDistribution.java | 40 ++- .../stress/settings/SettingsCommand.java | 14 +- .../settings/SettingsCommandPreDefined.java | 13 +- .../SettingsCommandPreDefinedMixed.java | 4 +- .../stress/settings/SettingsCommandUser.java | 16 +- .../stress/settings/SettingsErrors.java | 92 ++++++ .../stress/settings/SettingsInsert.java | 103 ++++++ .../cassandra/stress/settings/SettingsKey.java | 153 --------- .../stress/settings/SettingsPopulation.java | 176 ++++++++++ .../stress/settings/SettingsSchema.java | 17 +- .../stress/settings/StressSettings.java | 23 +- .../cassandra/stress/util/DynamicList.java | 259 +++++++++++++++ .../org/apache/cassandra/stress/util/Timer.java | 7 +- .../apache/cassandra/stress/util/Timing.java | 13 +- .../cassandra/stress/util/TimingInterval.java | 6 +- 61 files changed, 1955 insertions(+), 724 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 46836bf..e42d9c4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,7 +5,7 @@ * cqlsh: DESCRIBE support for frozen UDTs, tuples (CASSANDRA-7863) * Avoid exposing internal classes over JMX (CASSANDRA-7879) * Add null check for keys when freezing collection (CASSANDRA-7869) - + * Improve stress workload realism (CASSANDRA-7519) 2.1.0-rc7 * Add frozen keyword and require UDT to be frozen (CASSANDRA-7857) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-counter-example.yaml ---------------------------------------------------------------------- diff --git a/tools/cqlstress-counter-example.yaml b/tools/cqlstress-counter-example.yaml index cff14b6..f8f70ea 100644 --- a/tools/cqlstress-counter-example.yaml +++ b/tools/cqlstress-counter-example.yaml @@ -62,19 +62,17 @@ columnspec: population: fixed(1) insert: - partitions: fixed(1) # number of unique partitions to update in a single operation - # if perbatch < 1, multiple batches will be used but all partitions will - # occur in all batches (unless already finished); only the row counts will vary - pervisit: fixed(1)/1 # ratio of rows each partition should update in a single visit to the partition, - # as a proportion of the total possible for the partition - perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement, - # as a proportion of the proportion we are inserting this visit - # (i.e. compounds with (and capped by) pervisit) - batchtype: UNLOGGED # type of batch to use + partitions: fixed(1) # number of unique partitions to update in a single operation + # if batchcount > 1, multiple batches will be used but all partitions will + # occur in all batches (unless they finish early); only the row counts will vary + batchtype: LOGGED # type of batch to use + select: fixed(1)/1 # uniform chance any single generated CQL row will be visited in a partition; + # generated for each partition independently, each time we visit it # # A list of queries you wish to run against the schema # queries: - simple1: select * from counttest where name = ? - + simple1: + cql: select * from counttest where name = ? + fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-example.yaml ---------------------------------------------------------------------- diff --git a/tools/cqlstress-example.yaml b/tools/cqlstress-example.yaml index d5c90a2..4dd5e4a 100644 --- a/tools/cqlstress-example.yaml +++ b/tools/cqlstress-example.yaml @@ -69,25 +69,26 @@ columnspec: size: uniform(1..10) population: uniform(1..1M) # the range of unique values to select for the field (default is 100Billion) - name: date - cluster: uniform(1..4) + cluster: uniform(20..40) - name: lval population: gaussian(1..1000) cluster: uniform(1..4) insert: - partitions: uniform(1..50) # number of unique partitions to update in a single operation - # if perbatch < 1, multiple batches will be used but all partitions will - # occur in all batches (unless already finished); only the row counts will vary - pervisit: uniform(1..10)/10 # ratio of rows each partition should update in a single visit to the partition, - # as a proportion of the total possible for the partition - perbatch: ~exp(1..3)/4 # number of rows each partition should update in a single batch statement, - # as a proportion of the proportion we are inserting this visit - # (i.e. compounds with (and capped by) pervisit) - batchtype: UNLOGGED # type of batch to use + partitions: uniform(1..50) # number of unique partitions to update in a single operation + # if batchcount > 1, multiple batches will be used but all partitions will + # occur in all batches (unless they finish early); only the row counts will vary + batchtype: LOGGED # type of batch to use + select: uniform(1..10)/10 # uniform chance any single generated CQL row will be visited in a partition; + # generated for each partition independently, each time we visit it # # A list of queries you wish to run against the schema # queries: - simple1: select * from typestest where name = ? and choice = ? LIMIT 100 - range1: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100 + simple1: + cql: select * from typestest where name = ? and choice = ? LIMIT 100 + fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition) + range1: + cql: select * from typestest where name = ? and choice = ? and date >= ? LIMIT 100 + fields: multirow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/cqlstress-insanity-example.yaml ---------------------------------------------------------------------- diff --git a/tools/cqlstress-insanity-example.yaml b/tools/cqlstress-insanity-example.yaml index ef1bb3a..ea4f97f 100644 --- a/tools/cqlstress-insanity-example.yaml +++ b/tools/cqlstress-insanity-example.yaml @@ -74,19 +74,17 @@ columnspec: insert: - partitions: fixed(1) # number of unique partitions to update in a single operation - # if perbatch < 1, multiple batches will be used but all partitions will - # occur in all batches (unless already finished); only the row counts will vary - pervisit: uniform(1..10)/10 # ratio of rows each partition should update in a single visit to the partition, - # as a proportion of the total possible for the partition - perbatch: fixed(1)/1 # number of rows each partition should update in a single batch statement, - # as a proportion of the proportion we are inserting this visit - # (i.e. compounds with (and capped by) pervisit) - batchtype: UNLOGGED # type of batch to use + partitions: fixed(1) # number of unique partitions to update in a single operation + # if batchcount > 1, multiple batches will be used but all partitions will + # occur in all batches (unless they finish early); only the row counts will vary + batchtype: LOGGED # type of batch to use + select: fixed(1)/1 # uniform chance any single generated CQL row will be visited in a partition; + # generated for each partition independently, each time we visit it # # A list of queries you wish to run against the schema # queries: - simple1: select * from insanitytest where name = ? and choice = ? LIMIT 100 - + simple1: + cql: select * from insanitytest where name = ? and choice = ? LIMIT 100 + fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index 7831074..5560240 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -61,6 +61,11 @@ public abstract class Operation this.partitions = partitions; } + public boolean isWrite() + { + return false; + } + /** * Run operation * @param client Cassandra Thrift client connection @@ -84,7 +89,7 @@ public abstract class Operation String exceptionMessage = null; int tries = 0; - for (; tries < settings.command.tries; tries++) + for (; tries < settings.errors.tries; tries++) { try { @@ -144,7 +149,7 @@ public abstract class Operation protected void error(String message) throws IOException { - if (!settings.command.ignoreErrors) + if (!settings.errors.ignore) throw new IOException(message); else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0) System.err.println(message); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 2105a72..e58bfa1 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -23,7 +23,6 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -32,7 +31,6 @@ import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.stress.generate.Partition; -import org.apache.cassandra.stress.generate.SeedGenerator; import org.apache.cassandra.stress.operations.OpDistribution; import org.apache.cassandra.stress.operations.OpDistributionFactory; import org.apache.cassandra.stress.settings.*; @@ -58,6 +56,7 @@ public class StressAction implements Runnable // creating keyspace and column families settings.maybeCreateKeyspaces(); + // TODO: warmup should if (!settings.command.noWarmup) warmup(settings.command.getFactory(settings)); @@ -155,8 +154,8 @@ public class StressAction implements Runnable double improvement = 0; for (int i = results.size() - count ; i < results.size() ; i++) { - double prev = results.get(i - 1).getTiming().getHistory().realOpRate(); - double cur = results.get(i).getTiming().getHistory().realOpRate(); + double prev = results.get(i - 1).getTiming().getHistory().opRate(); + double cur = results.get(i).getTiming().getHistory().opRate(); improvement += (cur - prev) / prev; } return improvement / count; @@ -169,11 +168,11 @@ public class StressAction implements Runnable operations.desc(), threadCount, opCount > 0 ? " for " + opCount + " iterations" : "until stderr of mean < " + settings.command.targetUncertainty)); - final WorkQueue workQueue; + final WorkManager workManager; if (opCount < 0) - workQueue = new ContinuousWorkQueue(50); + workManager = new ContinuousWorkManager(); else - workQueue = FixedWorkQueue.build(opCount); + workManager = new FixedWorkManager(opCount); RateLimiter rateLimiter = null; // TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution @@ -185,7 +184,7 @@ public class StressAction implements Runnable final CountDownLatch done = new CountDownLatch(threadCount); final Consumer[] consumers = new Consumer[threadCount]; for (int i = 0; i < threadCount; i++) - consumers[i] = new Consumer(operations, done, workQueue, metrics, rateLimiter); + consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter); // starting worker threadCount for (int i = 0; i < threadCount; i++) @@ -201,14 +200,15 @@ public class StressAction implements Runnable settings.command.minimumUncertaintyMeasurements, settings.command.maximumUncertaintyMeasurements); } catch (InterruptedException e) { } - workQueue.stop(); + workManager.stop(); } try { done.await(); metrics.stop(); - } catch (InterruptedException e) {} + } + catch (InterruptedException e) {} if (metrics.wasCancelled()) return null; @@ -231,20 +231,18 @@ public class StressAction implements Runnable private final OpDistribution operations; private final StressMetrics metrics; private final Timer timer; - private final SeedGenerator seedGenerator; private final RateLimiter rateLimiter; private volatile boolean success = true; - private final WorkQueue workQueue; + private final WorkManager workManager; private final CountDownLatch done; - public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkQueue workQueue, StressMetrics metrics, RateLimiter rateLimiter) + public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics, RateLimiter rateLimiter) { this.done = done; this.rateLimiter = rateLimiter; - this.workQueue = workQueue; + this.workManager = workManager; this.metrics = metrics; this.timer = metrics.getTiming().newTimer(); - this.seedGenerator = settings.keys.newSeedGenerator(); this.operations = operations.get(timer); } @@ -275,42 +273,33 @@ public class StressAction implements Runnable } int maxBatchSize = operations.maxBatchSize(); - Work work = workQueue.poll(); Partition[] partitions = new Partition[maxBatchSize]; - int workDone = 0; - while (work != null) + while (true) { + // TODO: Operation should be able to ecapsulate much of this behaviour Operation op = operations.next(); op.generator.reset(); - int batchSize = Math.max(1, (int) op.partitionCount.next()); - int partitionCount = 0; + int batchSize = workManager.takePermits(Math.max(1, (int) op.partitionCount.next())); + if (batchSize < 0) + break; + + if (rateLimiter != null) + rateLimiter.acquire(batchSize); + + int partitionCount = 0; while (partitionCount < batchSize) { - int count = Math.min((work.count - workDone), batchSize - partitionCount); - for (int i = 0 ; i < count ; i++) - { - long seed = seedGenerator.next(work.offset + workDone + i); - partitions[partitionCount + i] = op.generator.generate(seed); - } - workDone += count; - partitionCount += count; - if (workDone == work.count) - { - workDone = 0; - work = workQueue.poll(); - if (work == null) - { - if (partitionCount == 0) - return; - break; - } - if (rateLimiter != null) - rateLimiter.acquire(work.count); - } + Partition p = op.generator.generate(op); + if (p == null) + break; + partitions[partitionCount++] = p; } + if (partitionCount == 0) + break; + op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount)); try @@ -340,7 +329,7 @@ public class StressAction implements Runnable e.printStackTrace(output); success = false; - workQueue.stop(); + workManager.stop(); metrics.cancel(); return; } @@ -356,107 +345,58 @@ public class StressAction implements Runnable } - private interface WorkQueue + private interface WorkManager { - // null indicates consumer should terminate - Work poll(); + // -1 indicates consumer should terminate + int takePermits(int count); // signal all consumers to terminate void stop(); } - private static final class Work - { - // index of operations - final long offset; - - // how many operations to perform - final int count; - - public Work(long offset, int count) - { - this.offset = offset; - this.count = count; - } - } - - private static final class FixedWorkQueue implements WorkQueue + private static final class FixedWorkManager implements WorkManager { - final ArrayBlockingQueue<Work> work; - volatile boolean stop = false; + final AtomicLong permits; - public FixedWorkQueue(ArrayBlockingQueue<Work> work) + public FixedWorkManager(long permits) { - this.work = work; + this.permits = new AtomicLong(permits); } @Override - public Work poll() + public int takePermits(int count) { - if (stop) - return null; - return work.poll(); + while (true) + { + long cur = permits.get(); + if (cur == 0) + return -1; + count = (int) Math.min(count, cur); + long next = cur - count; + if (permits.compareAndSet(cur, next)) + return count; + } } @Override public void stop() { - stop = true; + permits.getAndSet(0); } - - static FixedWorkQueue build(long operations) - { - // target splitting into around 50-500k items, with a minimum size of 20 - if (operations > Integer.MAX_VALUE * (1L << 19)) - throw new IllegalStateException("Cannot currently support more than approx 2^50 operations for one stress run. This is a LOT."); - int batchSize = (int) (operations / (1 << 19)); - if (batchSize < 20) - batchSize = 20; - ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<>( - (int) ((operations / batchSize) - + (operations % batchSize == 0 ? 0 : 1)) - ); - long offset = 0; - while (offset < operations) - { - work.add(new Work(offset, (int) Math.min(batchSize, operations - offset))); - offset += batchSize; - } - return new FixedWorkQueue(work); - } - } - private static final class ContinuousWorkQueue implements WorkQueue + private static final class ContinuousWorkManager implements WorkManager { - final AtomicLong offset = new AtomicLong(); - final int batchSize; volatile boolean stop = false; - private ContinuousWorkQueue(int batchSize) - { - this.batchSize = batchSize; - } - @Override - public Work poll() + public int takePermits(int count) { if (stop) - return null; - return new Work(nextOffset(), batchSize); - } - - private long nextOffset() - { - final int inc = batchSize; - while (true) - { - final long cur = offset.get(); - if (offset.compareAndSet(cur, cur + inc)) - return cur; - } + return -1; + return count; } @Override @@ -466,5 +406,4 @@ public class StressAction implements Runnable } } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java index 7e5c1b6..a9edfc6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java @@ -42,7 +42,7 @@ public class StressMetrics private final Thread thread; private volatile boolean stop = false; private volatile boolean cancelled = false; - private final Uncertainty opRateUncertainty = new Uncertainty(); + private final Uncertainty rowRateUncertainty = new Uncertainty(); private final CountDownLatch stopped = new CountDownLatch(1); private final Timing timing = new Timing(); @@ -68,6 +68,7 @@ public class StressMetrics Thread.sleep(logIntervalMillis); else Thread.sleep(sleep); + update(); } catch (InterruptedException e) { @@ -86,6 +87,7 @@ public class StressMetrics } finally { + rowRateUncertainty.wakeAll(); stopped.countDown(); } } @@ -99,7 +101,7 @@ public class StressMetrics public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException { - opRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements); + rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements); } public void cancel() @@ -107,7 +109,7 @@ public class StressMetrics cancelled = true; stop = true; thread.interrupt(); - opRateUncertainty.wakeAll(); + rowRateUncertainty.wakeAll(); } public void stop() throws InterruptedException @@ -120,8 +122,11 @@ public class StressMetrics private void update() throws InterruptedException { TimingInterval interval = timing.snapInterval(); - printRow("", interval, timing.getHistory(), opRateUncertainty, output); - opRateUncertainty.update(interval.adjustedOpRate()); + if (interval.partitionCount != 0) + printRow("", interval, timing.getHistory(), rowRateUncertainty, output); + rowRateUncertainty.update(interval.adjustedRowRate()); + if (timing.done()) + stop = true; } @@ -132,14 +137,15 @@ public class StressMetrics private static void printHeader(String prefix, PrintStream output) { - output.println(prefix + String.format(HEADFORMAT, "partitions","op/s", "pk/s", "row/s","mean","med",".95",".99",".999","max","time","stderr")); + output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr")); } private static void printRow(String prefix, TimingInterval interval, TimingInterval total, Uncertainty opRateUncertainty, PrintStream output) { output.println(prefix + String.format(ROWFORMAT, - total.partitionCount, - interval.realOpRate(), + total.operationCount, + interval.adjustedRowRate(), + interval.opRate(), interval.partitionRate(), interval.rowRate(), interval.meanLatency(), @@ -157,7 +163,7 @@ public class StressMetrics output.println("\n"); output.println("Results:"); TimingInterval history = timing.getHistory(); - output.println(String.format("op rate : %.0f", history.realOpRate())); + output.println(String.format("op rate : %.0f", history.opRate())); output.println(String.format("partition rate : %.0f", history.partitionRate())); output.println(String.format("row rate : %.0f", history.rowRate())); output.println(String.format("latency mean : %.1f", history.meanLatency())); @@ -181,7 +187,7 @@ public class StressMetrics printRow(String.format(formatstr, ids.get(i)), summarise.get(i).timing.getHistory(), summarise.get(i).timing.getHistory(), - summarise.get(i).opRateUncertainty, + summarise.get(i).rowRateUncertainty, out ); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 4e09775..de561f3 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -24,15 +24,18 @@ package org.apache.cassandra.stress; import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.AlreadyExistsException; +import com.google.common.base.Function; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.stress.generate.Distribution; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.stress.generate.PartitionGenerator; import org.apache.cassandra.stress.generate.RatioDistributionFactory; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.generate.values.Booleans; import org.apache.cassandra.stress.generate.values.Bytes; import org.apache.cassandra.stress.generate.values.Generator; @@ -88,7 +91,7 @@ public class StressProfile implements Serializable public String keyspaceName; public String tableName; private Map<String, GeneratorConfig> columnConfigs; - private Map<String, String> queries; + private Map<String, StressYaml.QueryDef> queries; private Map<String, String> insert; transient volatile TableMetadata tableMetaData; @@ -97,11 +100,11 @@ public class StressProfile implements Serializable transient volatile BatchStatement.Type batchType; transient volatile DistributionFactory partitions; - transient volatile RatioDistributionFactory pervisit; - transient volatile RatioDistributionFactory perbatch; + transient volatile RatioDistributionFactory selectchance; transient volatile PreparedStatement insertStatement; transient volatile Integer thriftInsertId; + transient volatile Map<String, SchemaQuery.ArgSelect> argSelects; transient volatile Map<String, PreparedStatement> queryStatements; transient volatile Map<String, Integer> thriftQueryIds; @@ -242,13 +245,18 @@ public class StressProfile implements Serializable ThriftClient tclient = settings.getThriftClient(); Map<String, PreparedStatement> stmts = new HashMap<>(); Map<String, Integer> tids = new HashMap<>(); - for (Map.Entry<String, String> e : queries.entrySet()) + Map<String, SchemaQuery.ArgSelect> args = new HashMap<>(); + for (Map.Entry<String, StressYaml.QueryDef> e : queries.entrySet()) { - stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue())); - tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue(), Compression.NONE)); + stmts.put(e.getKey().toLowerCase(), jclient.prepare(e.getValue().cql)); + tids.put(e.getKey().toLowerCase(), tclient.prepare_cql3_query(e.getValue().cql, Compression.NONE)); + args.put(e.getKey().toLowerCase(), e.getValue().fields == null + ? SchemaQuery.ArgSelect.MULTIROW + : SchemaQuery.ArgSelect.valueOf(e.getValue().fields.toUpperCase())); } thriftQueryIds = tids; queryStatements = stmts; + argSelects = args; } catch (TException e) { @@ -260,7 +268,9 @@ public class StressProfile implements Serializable // TODO validation name = name.toLowerCase(); - return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL); + if (!queryStatements.containsKey(name)) + throw new IllegalArgumentException("No query defined with name " + name); + return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name)); } public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings) @@ -328,18 +338,37 @@ public class StressProfile implements Serializable insert = new HashMap<>(); lowerCase(insert); - partitions = OptionDistribution.get(!insert.containsKey("partitions") ? "fixed(1)" : insert.remove("partitions")); - pervisit = OptionRatioDistribution.get(!insert.containsKey("pervisit") ? "fixed(1)/1" : insert.remove("pervisit")); - perbatch = OptionRatioDistribution.get(!insert.containsKey("perbatch") ? "fixed(1)/1" : insert.remove("perbatch")); - batchType = !insert.containsKey("batchtype") ? BatchStatement.Type.LOGGED : BatchStatement.Type.valueOf(insert.remove("batchtype")); + partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER); + selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER); + batchType = settings.insert.batchType != null + ? settings.insert.batchType + : !insert.containsKey("batchtype") + ? BatchStatement.Type.LOGGED + : BatchStatement.Type.valueOf(insert.remove("batchtype")); if (!insert.isEmpty()) throw new IllegalArgumentException("Unrecognised insert option(s): " + insert); + Distribution visits = settings.insert.visits.get(); + // these min/max are not absolutely accurate if selectchance < 1, but they're close enough to + // guarantee the vast majority of actions occur in these bounds + double minBatchSize = selectchance.get().min() * partitions.get().minValue() * generator.minRowCount * (1d / visits.maxValue()); + double maxBatchSize = selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount * (1d / visits.minValue()); + System.out.printf("Generating batches with [%d..%d] partitions and [%.0f..%.0f] rows (of [%.0f..%.0f] total rows in the partitions)\n", + partitions.get().minValue(), partitions.get().maxValue(), + minBatchSize, maxBatchSize, + partitions.get().minValue() * generator.minRowCount, + partitions.get().maxValue() * generator.maxRowCount); if (generator.maxRowCount > 100 * 1000 * 1000) System.err.printf("WARNING: You have defined a schema that permits very large partitions (%.0f max rows (>100M))\n", generator.maxRowCount); - if (perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount > 100000) + if (batchType == BatchStatement.Type.LOGGED && maxBatchSize > 65535) + { + System.err.printf("ERROR: You have defined a workload that generates batches with more than 65k rows (%.0f), but have required the use of LOGGED batches. There is a 65k row limit on a single batch.\n", + selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount); + System.exit(1); + } + if (maxBatchSize > 100000) System.err.printf("WARNING: You have defined a schema that permits very large batches (%.0f max rows (>100K)). This may OOM this stress client, or the server.\n", - perbatch.get().max() * pervisit.get().max() * partitions.get().maxValue() * generator.maxRowCount); + selectchance.get().max() * partitions.get().maxValue() * generator.maxRowCount); JavaDriverClient client = settings.getJavaDriverClient(); String query = sb.toString(); @@ -356,10 +385,20 @@ public class StressProfile implements Serializable } } - return new SchemaInsert(timer, generator, settings, partitions.get(), pervisit.get(), perbatch.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); + return new SchemaInsert(timer, generator, settings, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); + } + + private static <E> E select(E first, String key, String defValue, Map<String, String> map, Function<String, E> builder) + { + String val = map.remove(key); + if (first != null) + return first; + if (val != null) + return builder.apply(val); + return builder.apply(defValue); } - public PartitionGenerator newGenerator(StressSettings settings) + public PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds) { if (generatorFactory == null) { @@ -371,7 +410,7 @@ public class StressProfile implements Serializable } } - return generatorFactory.newGenerator(); + return generatorFactory.newGenerator(settings, seeds); } private class GeneratorFactory @@ -393,9 +432,9 @@ public class StressProfile implements Serializable valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName()))); } - PartitionGenerator newGenerator() + PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds) { - return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns)); + return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order, seeds); } List<Generator> get(List<ColumnInfo> columnInfos) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java index deea1fb..b6efc5e 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressYaml.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressYaml.java @@ -30,8 +30,14 @@ public class StressYaml public String table; public String table_definition; - public List<Map<String,Object>> columnspec; - public Map<String,String> queries; - public Map<String,String> insert; + public List<Map<String, Object>> columnspec; + public Map<String, QueryDef> queries; + public Map<String, String> insert; + + public static class QueryDef + { + public String cql; + public String fields; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java index 13fae0d..4062b58 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionInverted.java @@ -55,4 +55,11 @@ public class DistributionInverted extends Distribution wrapped.setSeed(seed); } + public static Distribution invert(Distribution distribution) + { + if (distribution instanceof DistributionInverted) + return ((DistributionInverted) distribution).wrapped; + return new DistributionInverted(distribution); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java new file mode 100644 index 0000000..9771134 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/generate/DistributionQuantized.java @@ -0,0 +1,90 @@ +package org.apache.cassandra.stress.generate; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.util.Arrays; +import java.util.Random; + +import org.apache.cassandra.stress.Stress; + +public class DistributionQuantized extends Distribution +{ + + final Distribution delegate; + final long[] bounds; + final Random random = new Random(); + + public DistributionQuantized(Distribution delegate, int quantas) + { + this.delegate = delegate; + this.bounds = new long[quantas + 1]; + bounds[0] = delegate.minValue(); + bounds[quantas] = delegate.maxValue() + 1; + for (int i = 1 ; i < quantas ; i++) + bounds[i] = delegate.inverseCumProb(i / (double) quantas); + } + + @Override + public long next() + { + int quanta = quanta(delegate.next()); + return bounds[quanta] + (long) (random.nextDouble() * ((bounds[quanta + 1] - bounds[quanta]))); + } + + public double nextDouble() + { + throw new UnsupportedOperationException(); + } + + @Override + public long inverseCumProb(double cumProb) + { + long val = delegate.inverseCumProb(cumProb); + int quanta = quanta(val); + if (quanta < 0) + return bounds[0]; + if (quanta >= bounds.length - 1) + return bounds[bounds.length - 1] - 1; + cumProb -= (quanta / ((double) bounds.length - 1)); + cumProb *= (double) bounds.length - 1; + return bounds[quanta] + (long) (cumProb * (bounds[quanta + 1] - bounds[quanta])); + } + + int quanta(long val) + { + int i = Arrays.binarySearch(bounds, val); + if (i < 0) + return -2 -i; + return i - 1; + } + + public void setSeed(long seed) + { + delegate.setSeed(seed); + } + + public static void main(String[] args) throws Exception + { + Stress.main(new String[] { "print", "dist=qextreme(1..1M,2,2)"}); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java new file mode 100644 index 0000000..455fec4 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java @@ -0,0 +1,116 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.stress.generate; + +import java.util.Random; + +import org.apache.commons.math3.random.RandomGenerator; + +// based on http://en.wikipedia.org/wiki/Xorshift, but periodically we reseed with our stronger random generator +// note it is also non-atomically updated, so expects to be used by a single thread +public class FasterRandom implements RandomGenerator +{ + final Random random = new Random(); + + private long seed; + private int reseed; + + public void setSeed(int seed) + { + setSeed((long) seed); + } + + public void setSeed(int[] ints) + { + if (ints.length > 1) + setSeed (((long) ints[0] << 32) | ints[1]); + else + setSeed(ints[0]); + } + + public void setSeed(long seed) + { + this.seed = seed; + rollover(); + } + + private void rollover() + { + this.reseed = 0; + random.setSeed(seed); + seed = random.nextLong(); + } + + public void nextBytes(byte[] bytes) + { + int i = 0; + while (i < bytes.length) + { + long next = nextLong(); + while (i < bytes.length) + { + bytes[i++] = (byte) (next & 0xFF); + next >>>= 8; + } + } + } + + public int nextInt() + { + return (int) nextLong(); + } + + public int nextInt(int i) + { + return Math.abs((int) nextLong() % i); + } + + public long nextLong() + { + if (++this.reseed == 32) + rollover(); + + long seed = this.seed; + seed ^= seed >> 12; + seed ^= seed << 25; + seed ^= seed >> 27; + this.seed = seed; + return seed * 2685821657736338717L; + } + + public boolean nextBoolean() + { + return ((int) nextLong() & 1) == 1; + } + + public float nextFloat() + { + return Float.intBitsToFloat((int) nextLong()); + } + + public double nextDouble() + { + return Double.longBitsToDouble(nextLong()); + } + + public double nextGaussian() + { + return random.nextGaussian(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java index f05e95b..18f5732 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java @@ -23,24 +23,34 @@ package org.apache.cassandra.stress.generate; import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Deque; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.stress.generate.values.Generator; // a partition is re-used to reduce garbage generation, as is its internal RowIterator +// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to +// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches +// of a single component, and then generate the values within those batches as necessary. this will be difficult with +// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes +// that are extended/suffixed to generate each batch, so that we can sort the prefixes) public class Partition { private long idseed; + private Seed seed; private final Object[] partitionKey; private final PartitionGenerator generator; private final RowIterator iterator; @@ -55,31 +65,32 @@ public class Partition iterator = new SingleRowIterator(); } - void setSeed(long seed) + void setSeed(Seed seed) { long idseed = 0; for (int i = 0 ; i < partitionKey.length ; i++) { Generator generator = this.generator.partitionKey.get(i); // set the partition key seed based on the current work item we're processing - generator.setSeed(seed); + generator.setSeed(seed.seed); Object key = generator.generate(); partitionKey[i] = key; // then contribute this value to the data seed idseed = seed(key, generator.type, idseed); } + this.seed = seed; this.idseed = idseed; } - public RowIterator iterator(double useChance) + public RowIterator iterator(double useChance, boolean isWrite) { - iterator.reset(useChance, 0); + iterator.reset(useChance, 0, 1, isWrite); return iterator; } - public RowIterator iterator(int targetCount) + public RowIterator iterator(int targetCount, boolean isWrite) { - iterator.reset(Double.NaN, targetCount); + iterator.reset(Double.NaN, targetCount, 1, isWrite); return iterator; } @@ -87,12 +98,12 @@ public class Partition { boolean done; - void reset(double useChance, int targetCount) + void reset(double useChance, int targetCount, int batches, boolean isWrite) { done = false; } - public Iterable<Row> batch(double ratio) + public Iterable<Row> next() { if (done) return Collections.emptyList(); @@ -110,6 +121,12 @@ public class Partition { return done; } + + public void markWriteFinished() + { + assert done; + generator.seeds.markFinished(seed); + } } public abstract class RowIterator @@ -117,10 +134,10 @@ public class Partition // we reuse the row object to save garbage final Row row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]); - public abstract Iterable<Row> batch(double ratio); - abstract void reset(double useChance, int targetCount); - + public abstract Iterable<Row> next(); public abstract boolean done(); + public abstract void markWriteFinished(); + abstract void reset(double useChance, int targetCount, int batches, boolean isWrite); public Partition partition() { @@ -128,31 +145,40 @@ public class Partition } } - // permits iterating a random subset of the procedurally generated rows in this partition; this is the only mechanism for visiting rows + // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows. // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level, // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children; - // if we do, we generate all possible values the children can take, and repeat the process. So at any one time we are using space proportional + // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition. + // TODO : guarantee at least one row is always returned + // TODO : support first/last row, and constraining reads to rows we know are populated class MultiRowIterator extends RowIterator { // probability any single row will be generated in this iteration double useChance; - double expectedRowCount; - // the current seed in use at any given level; used to save recalculating it for each row, so we only need to recalc - // from prior row + // the seed used to generate the current values for the clustering components at each depth; + // used to save recalculating it for each row, so we only need to recalc from prior row. final long[] clusteringSeeds = new long[generator.clusteringComponents.size()]; // the components remaining to be visited for each level of the current stack - final Queue<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()]; + final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()]; // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level // so that we know with what chance we reached there, and we adjust our roll at that level by that amount - double[] chancemodifier = new double[generator.clusteringComponents.size()]; - double[] rollmodifier = new double[generator.clusteringComponents.size()]; + final double[] chancemodifier = new double[generator.clusteringComponents.size()]; + final double[] rollmodifier = new double[generator.clusteringComponents.size()]; + + // track where in the partition we are, and where we are limited to + final int[] position = new int[generator.clusteringComponents.size()]; + final int[] limit = new int[position.length]; + int batchSize; + boolean returnedOne; + boolean forceReturnOne; - // reusable set for generating unique clustering components + // reusable collections for generating unique and sorted clustering components final Set<Object> unique = new HashSet<>(); + final List<Comparable> tosort = new ArrayList<>(); final Random random = new Random(); MultiRowIterator() @@ -163,126 +189,262 @@ public class Partition chancemodifier[0] = generator.clusteringChildAverages[0]; } - void reset(double useChance, int targetCount) + // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit + // count to decide how much we should return in one iteration + void reset(double useChance, int targetCount, int batches, boolean isWrite) { + if (this.useChance < 1d) + { + // we clear our prior roll-modifiers if the use chance was previously less-than zero + Arrays.fill(rollmodifier, 1d); + Arrays.fill(chancemodifier, 1d); + } + + // set the seed for the first clustering component generator.clusteringComponents.get(0).setSeed(idseed); + int[] position = seed.position; + + // calculate how many first clustering components we'll generate, and how many total rows this predicts int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next(); - this.expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0]; + int expectedRowCount; + + if (!isWrite && position != null) + { + expectedRowCount = 0; + for (int i = 0 ; i < position.length ; i++) + { + expectedRowCount += position[i] * generator.clusteringChildAverages[i]; + limit[i] = position[i]; + } + } + else + { + expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0]; + if (isWrite) + batches *= seed.visits; + Arrays.fill(limit, Integer.MAX_VALUE); + } + + batchSize = Math.max(1, expectedRowCount / batches); if (Double.isNaN(useChance)) - useChance = Math.max(0d, Math.min(1d, targetCount / expectedRowCount)); + useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount)); + // clear any remnants of the last iteration, wire up our constants, and fill in the first clustering components + this.useChance = useChance; + this.returnedOne = false; for (Queue<?> q : clusteringComponents) q.clear(); - - this.useChance = useChance; clusteringSeeds[0] = idseed; - clusteringComponents[0].add(this); fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0)); - advance(0, 1f); + + // seek to our start position + seek(isWrite ? position : null); } - void fill(int component) + // generate the clustering components for the provided depth; requires preceding components + // to have been generated and their seeds populated into clusteringSeeds + void fill(int depth) { - long seed = clusteringSeeds[component - 1]; - Generator gen = generator.clusteringComponents.get(component); + long seed = clusteringSeeds[depth - 1]; + Generator gen = generator.clusteringComponents.get(depth); gen.setSeed(seed); - clusteringSeeds[component] = seed(clusteringComponents[component - 1].peek(), generator.clusteringComponents.get(component - 1).type, seed); - fill(clusteringComponents[component], (int) gen.clusteringDistribution.next(), gen); + clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed); + fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen); } + // generate the clustering components into the queue void fill(Queue<Object> queue, int count, Generator generator) { if (count == 1) { queue.add(generator.generate()); + return; } - else + + switch (Partition.this.generator.order) { - unique.clear(); - for (int i = 0 ; i < count ; i++) - { - Object next = generator.generate(); - if (unique.add(next)) - queue.add(next); - } + case SORTED: + if (Comparable.class.isAssignableFrom(generator.clazz)) + { + tosort.clear(); + for (int i = 0 ; i < count ; i++) + tosort.add((Comparable) generator.generate()); + Collections.sort(tosort); + for (int i = 0 ; i < count ; i++) + if (i == 0 || tosort.get(i - 1).compareTo(i) < 0) + queue.add(tosort.get(i)); + break; + } + case ARBITRARY: + unique.clear(); + for (int i = 0 ; i < count ; i++) + { + Object next = generator.generate(); + if (unique.add(next)) + queue.add(next); + } + break; + case SHUFFLED: + unique.clear(); + tosort.clear(); + for (int i = 0 ; i < count ; i++) + { + Object next = generator.generate(); + if (unique.add(next)) + tosort.add(new RandomOrder(next)); + } + Collections.sort(tosort); + for (Object o : tosort) + queue.add(((RandomOrder)o).value); + break; + default: + throw new IllegalStateException(); + } + } + + // seek to the provided position (or the first entry if null) + private void seek(int[] position) + { + if (position == null) + { + this.position[0] = -1; + clusteringComponents[0].addFirst(this); + advance(0); + return; + } + + assert position.length == clusteringComponents.length; + for (int i = 0 ; i < position.length ; i++) + { + if (i != 0) + fill(i); + for (int c = position[i] ; c > 0 ; c--) + clusteringComponents[i].poll(); + row.row[i] = clusteringComponents[i].peek(); } + System.arraycopy(position, 0, this.position, 0, position.length); } - private boolean advance(double continueChance) + // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int) + // to move the iterator to the next item + void advance() { - // we always start at the leaf level + // we are always at the leaf level when this method is invoked + // so we calculate the seed for generating the row by combining the seed that generated the clustering components int depth = clusteringComponents.length - 1; - // fill the row with the position we *were* at (unless pre-start) + long parentSeed = clusteringSeeds[depth]; + long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed); + + // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver for (int i = clusteringSeeds.length ; i < row.row.length ; i++) { Generator gen = generator.valueComponents.get(i - clusteringSeeds.length); - long seed = clusteringSeeds[depth]; - seed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, seed); - gen.setSeed(seed); + gen.setSeed(rowSeed); row.row[i] = gen.generate(); } - clusteringComponents[depth].poll(); + returnedOne = true; + forceReturnOne = false; - return advance(depth, continueChance); + // then we advance the leaf level + advance(depth); } - private boolean advance(int depth, double continueChance) + private void advance(int depth) { // advance the leaf component clusteringComponents[depth].poll(); + position[depth]++; while (true) { if (clusteringComponents[depth].isEmpty()) { + // if we've run out of clustering components at this level, ascend if (depth == 0) - return false; + return; depth--; clusteringComponents[depth].poll(); + position[depth]++; continue; } - // the chance of descending is the uniform use chance, multiplied by the number of children + if (depth == 0 && !returnedOne && clusteringComponents[0].size() == 1) + forceReturnOne = true; + + // the chance of descending is the uniform usechance, multiplied by the number of children // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our // chance of beating this next roll double thischance = useChance * chancemodifier[depth]; - if (thischance > 0.999f || thischance >= random.nextDouble()) + if (forceReturnOne || thischance > 0.999f || thischance >= random.nextDouble()) { + // if we're descending, we fill in our clustering component and increase our depth row.row[depth] = clusteringComponents[depth].peek(); depth++; if (depth == clusteringComponents.length) break; - rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance); - chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth]; + // if we haven't reached the leaf, we update our probability statistics, fill in all of + // this level's clustering components, and repeat + if (useChance < 1d) + { + rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance); + chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth]; + } + position[depth] = 0; fill(depth); continue; } + // if we don't descend, we remove the clustering suffix we've skipped and continue clusteringComponents[depth].poll(); + position[depth]++; } - - return continueChance >= 1.0d || continueChance >= random.nextDouble(); } - public Iterable<Row> batch(final double ratio) + public Iterable<Row> next() { - final double continueChance = 1d - (Math.pow(ratio, expectedRowCount * useChance)); + final int[] limit = position.clone(); + int remainingSize = batchSize; + for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++) + { + limit[i] += remainingSize / generator.clusteringChildAverages[i]; + remainingSize %= generator.clusteringChildAverages[i]; + } + assert remainingSize == 0; + for (int i = limit.length - 1 ; i > 0 ; i--) + { + if (limit[i] > generator.clusteringChildAverages[i]) + { + limit[i - 1] += limit[i] / generator.clusteringChildAverages[i]; + limit[i] %= generator.clusteringChildAverages[i]; + } + } + for (int i = 0 ; i < limit.length ; i++) + { + if (limit[i] < this.limit[i]) + break; + limit[i] = Math.min(limit[i], this.limit[i]); + } return new Iterable<Row>() { public Iterator<Row> iterator() { return new Iterator<Row>() { - boolean hasNext = true; + public boolean hasNext() { - return hasNext; + if (done()) + return false; + for (int i = 0 ; i < position.length ; i++) + if (position[i] < limit[i]) + return true; + return false; } public Row next() { - hasNext = advance(continueChance); + advance(); return row; } @@ -300,26 +462,37 @@ public class Partition return clusteringComponents[0].isEmpty(); } + public void markWriteFinished() + { + if (done()) + generator.seeds.markFinished(seed); + else + generator.seeds.markVisited(seed, position.clone()); + } + public Partition partition() { return Partition.this; } } - public String getKeyAsString() + private static class RandomOrder implements Comparable<RandomOrder> { - StringBuilder sb = new StringBuilder(); - int i = 0; - for (Object key : partitionKey) + final int order = ThreadLocalRandom.current().nextInt(); + final Object value; + private RandomOrder(Object value) { - if (i > 0) - sb.append("|"); - AbstractType type = generator.partitionKey.get(i++).type; - sb.append(type.getString(type.decompose(key))); + this.value = value; + } + + public int compareTo(RandomOrder that) + { + return Integer.compare(this.order, that.order); } - return sb.toString(); } + // calculate a new seed based on the combination of a parent seed and the generated child, to generate + // any children of this child static long seed(Object object, AbstractType type, long seed) { if (object instanceof ByteBuffer) @@ -355,6 +528,20 @@ public class Partition return partitionKey[i]; } + public String getKeyAsString() + { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (Object key : partitionKey) + { + if (i > 0) + sb.append("|"); + AbstractType type = generator.partitionKey.get(i++).type; + sb.append(type.getString(type.decompose(key))); + } + return sb.toString(); + } + // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now public ByteBuffer getToken() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java index d05350d..128d2f5 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java @@ -30,18 +30,27 @@ import java.util.NoSuchElementException; import com.google.common.collect.Iterables; +import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.values.Generator; public class PartitionGenerator { + public static enum Order + { + ARBITRARY, SHUFFLED, SORTED + } + public final double maxRowCount; + public final double minRowCount; final List<Generator> partitionKey; final List<Generator> clusteringComponents; final List<Generator> valueComponents; final int[] clusteringChildAverages; private final Map<String, Integer> indexMap; + final Order order; + final SeedManager seeds; final List<Partition> recyclable = new ArrayList<>(); int partitionsInUse = 0; @@ -51,18 +60,25 @@ public class PartitionGenerator partitionsInUse = 0; } - public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents) + public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order, SeedManager seeds) { this.partitionKey = partitionKey; this.clusteringComponents = clusteringComponents; this.valueComponents = valueComponents; + this.order = order; + this.seeds = seeds; this.clusteringChildAverages = new int[clusteringComponents.size()]; for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--) clusteringChildAverages[i] = (int) (i < (clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1); double maxRowCount = 1d; + double minRowCount = 1d; for (Generator component : clusteringComponents) + { maxRowCount *= component.clusteringDistribution.maxValue(); + minRowCount *= component.clusteringDistribution.minValue(); + } this.maxRowCount = maxRowCount; + this.minRowCount = minRowCount; this.indexMap = new HashMap<>(); int i = 0; for (Generator generator : partitionKey) @@ -72,6 +88,11 @@ public class PartitionGenerator indexMap.put(generator.name, i++); } + public boolean permitNulls(int index) + { + return !(index < 0 || index < clusteringComponents.size()); + } + public int indexOf(String name) { Integer i = indexMap.get(name); @@ -80,11 +101,14 @@ public class PartitionGenerator return i; } - public Partition generate(long seed) + public Partition generate(Operation op) { if (recyclable.size() <= partitionsInUse || recyclable.get(partitionsInUse) == null) recyclable.add(new Partition(this)); + Seed seed = seeds.next(op); + if (seed == null) + return null; Partition partition = recyclable.get(partitionsInUse++); partition.setSeed(seed); return partition; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java index 37ad4c5..c71945a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/RatioDistribution.java @@ -39,6 +39,11 @@ public class RatioDistribution return Math.max(0f, Math.min(1f, distribution.nextDouble() / divisor)); } + public double min() + { + return Math.min(1d, distribution.minValue() / divisor); + } + public double max() { return Math.min(1d, distribution.maxValue() / divisor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java new file mode 100644 index 0000000..f427608 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java @@ -0,0 +1,67 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.stress.generate; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.apache.cassandra.stress.util.DynamicList; + +public class Seed implements Comparable<Seed> +{ + + public final long seed; + final int visits; + + DynamicList.Node poolNode; + volatile int[] position; + volatile State state = State.HELD; + + private static final AtomicReferenceFieldUpdater<Seed, Seed.State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, "state"); + + public int compareTo(Seed that) + { + return Long.compare(this.seed, that.seed); + } + + static enum State + { + HELD, AVAILABLE + } + + Seed(long seed, int visits) + { + this.seed = seed; + this.visits = visits; + } + + boolean take() + { + return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD); + } + + void yield() + { + state = State.AVAILABLE; + } + + public int[] position() + { + return position; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java deleted file mode 100644 index d579223..0000000 --- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedGenerator.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.cassandra.stress.generate; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -public interface SeedGenerator -{ - - long next(long workIndex); - -}