cassandra-stress simultaneous inserts over same seed patch by benedict; reviewed by rstupp CASSANDRA-7964
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c579a01 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c579a01 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c579a01 Branch: refs/heads/trunk Commit: 6c579a0102fa3e67215fef5d9f8aa97191e3a216 Parents: cdba5aa Author: Benedict Elliott Smith <bened...@apache.org> Authored: Fri Dec 12 14:09:37 2014 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Fri Dec 12 14:09:37 2014 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/stress/Operation.java | 86 ++- .../apache/cassandra/stress/StressAction.java | 96 +--- .../apache/cassandra/stress/StressMetrics.java | 1 - .../apache/cassandra/stress/StressProfile.java | 65 +-- .../apache/cassandra/stress/StressServer.java | 3 +- .../cassandra/stress/generate/Partition.java | 554 ------------------- .../stress/generate/PartitionGenerator.java | 39 +- .../apache/cassandra/stress/generate/Seed.java | 52 +- .../cassandra/stress/generate/SeedManager.java | 66 ++- .../cassandra/stress/generate/values/Bytes.java | 7 +- .../stress/generate/values/GeneratorConfig.java | 9 +- .../stress/generate/values/Strings.java | 2 - .../stress/generate/values/TimeUUIDs.java | 4 +- .../stress/operations/FixedOpDistribution.java | 7 - .../stress/operations/OpDistribution.java | 1 - .../operations/SampledOpDistribution.java | 9 - .../operations/predefined/CqlCounterAdder.java | 5 +- .../operations/predefined/CqlCounterGetter.java | 5 +- .../operations/predefined/CqlInserter.java | 6 +- .../operations/predefined/CqlOperation.java | 32 +- .../stress/operations/predefined/CqlReader.java | 7 +- .../predefined/PredefinedOperation.java | 40 +- .../predefined/ThriftCounterAdder.java | 9 +- .../predefined/ThriftCounterGetter.java | 5 +- .../operations/predefined/ThriftInserter.java | 11 +- .../operations/predefined/ThriftReader.java | 7 +- .../operations/userdefined/SchemaInsert.java | 52 +- .../operations/userdefined/SchemaQuery.java | 55 +- .../operations/userdefined/SchemaStatement.java | 16 +- .../cassandra/stress/settings/Command.java | 5 +- .../stress/settings/OptionAnyProbabilities.java | 8 +- .../stress/settings/OptionDistribution.java | 4 +- .../settings/OptionEnumProbabilities.java | 2 +- .../cassandra/stress/settings/OptionMulti.java | 7 +- .../settings/OptionRatioDistribution.java | 15 +- .../stress/settings/SettingsColumn.java | 12 +- .../stress/settings/SettingsCommand.java | 1 - .../settings/SettingsCommandPreDefined.java | 7 +- .../SettingsCommandPreDefinedMixed.java | 6 +- .../stress/settings/SettingsCommandUser.java | 14 +- .../stress/settings/SettingsErrors.java | 3 - .../cassandra/stress/settings/SettingsNode.java | 7 +- .../stress/settings/SettingsSchema.java | 1 - .../stress/settings/StressSettings.java | 3 - .../cassandra/stress/util/DynamicList.java | 15 +- .../cassandra/stress/util/JavaDriverClient.java | 4 +- .../stress/util/SmartThriftClient.java | 3 +- .../org/apache/cassandra/stress/util/Timer.java | 1 - .../apache/cassandra/stress/util/Timing.java | 1 - .../cassandra/stress/util/TimingInterval.java | 1 - 51 files changed, 323 insertions(+), 1049 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fa42e85..25140a8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964) * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926) * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383) * Make read "defrag" async to reclaim memtables (CASSANDRA-8459) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index 5560240..edf3a54 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -19,13 +19,13 @@ package org.apache.cassandra.stress; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import org.apache.cassandra.stress.generate.Distribution; -import org.apache.cassandra.stress.generate.Partition; -import org.apache.cassandra.stress.generate.PartitionGenerator; -import org.apache.cassandra.stress.settings.*; +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.stress.generate.*; +import org.apache.cassandra.stress.settings.SettingsLog; +import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; @@ -36,17 +36,42 @@ public abstract class Operation { public final StressSettings settings; public final Timer timer; - public final PartitionGenerator generator; - public final Distribution partitionCount; + protected final DataSpec spec; + + private final List<PartitionIterator> partitionCache = new ArrayList<>(); + protected List<PartitionIterator> partitions; - protected List<Partition> partitions; + public static final class DataSpec + { + public final PartitionGenerator partitionGenerator; + final SeedManager seedManager; + final Distribution partitionCount; + final RatioDistribution useRatio; + final Integer targetCount; + + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, Integer targetCount) + { + this(partitionGenerator, seedManager, partitionCount, null, targetCount); + } + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio) + { + this(partitionGenerator, seedManager, partitionCount, useRatio, null); + } + private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, Integer targetCount) + { + this.partitionGenerator = partitionGenerator; + this.seedManager = seedManager; + this.partitionCount = partitionCount; + this.useRatio = useRatio; + this.targetCount = targetCount; + } + } - public Operation(Timer timer, PartitionGenerator generator, StressSettings settings, Distribution partitionCount) + public Operation(Timer timer, StressSettings settings, DataSpec spec) { - this.generator = generator; this.timer = timer; this.settings = settings; - this.partitionCount = partitionCount; + this.spec = spec; } public static interface RunOp @@ -56,9 +81,42 @@ public abstract class Operation public int rowCount(); } - protected void setPartitions(List<Partition> partitions) + boolean ready(WorkManager permits, RateLimiter rateLimiter) { - this.partitions = partitions; + int partitionCount = (int) spec.partitionCount.next(); + if (partitionCount <= 0) + return false; + partitionCount = permits.takePermits(partitionCount); + if (partitionCount <= 0) + return false; + + int i = 0; + boolean success = true; + for (; i < partitionCount && success ; i++) + { + if (i >= partitionCache.size()) + partitionCache.add(PartitionIterator.get(spec.partitionGenerator, spec.seedManager)); + + success = false; + while (!success) + { + Seed seed = spec.seedManager.next(this); + if (seed == null) + break; + + if (spec.useRatio == null) + success = partitionCache.get(i).reset(seed, spec.targetCount, this); + else + success = partitionCache.get(i).reset(seed, spec.useRatio.next(), this); + } + } + partitionCount = i; + + if (rateLimiter != null) + rateLimiter.acquire(partitionCount); + + partitions = partitionCache.subList(0, partitionCount); + return !partitions.isEmpty(); } public boolean isWrite() @@ -135,7 +193,7 @@ public abstract class Operation private String key() { List<String> keys = new ArrayList<>(); - for (Partition partition : partitions) + for (PartitionIterator partition : partitions) keys.add(partition.getKeyAsString()); return keys.toString(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 68e0004..1433742 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -21,19 +21,16 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; -import org.apache.cassandra.stress.generate.Partition; import org.apache.cassandra.stress.operations.OpDistribution; import org.apache.cassandra.stress.operations.OpDistributionFactory; -import org.apache.cassandra.stress.settings.*; +import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; @@ -180,9 +177,9 @@ public class StressAction implements Runnable : "until stderr of mean < " + settings.command.targetUncertainty)); final WorkManager workManager; if (opCount < 0) - workManager = new ContinuousWorkManager(); + workManager = new WorkManager.ContinuousWorkManager(); else - workManager = new FixedWorkManager(opCount); + workManager = new WorkManager.FixedWorkManager(opCount); final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings); @@ -285,36 +282,12 @@ public class StressAction implements Runnable throw new IllegalStateException(); } - int maxBatchSize = operations.maxBatchSize(); - Partition[] partitions = new Partition[maxBatchSize]; while (true) { - - // TODO: Operation should be able to ecapsulate much of this behaviour Operation op = operations.next(); - op.generator.reset(); - - int batchSize = workManager.takePermits(Math.max(1, (int) op.partitionCount.next())); - if (batchSize < 0) - break; - - if (rateLimiter != null) - rateLimiter.acquire(batchSize); - - int partitionCount = 0; - while (partitionCount < batchSize) - { - Partition p = op.generator.generate(op); - if (p == null) - break; - partitions[partitionCount++] = p; - } - - if (partitionCount == 0) + if (!op.ready(workManager, rateLimiter)) break; - op.setPartitions(Arrays.asList(partitions).subList(0, partitionCount)); - try { switch (settings.mode.api) @@ -358,65 +331,4 @@ public class StressAction implements Runnable } - private interface WorkManager - { - // -1 indicates consumer should terminate - int takePermits(int count); - - // signal all consumers to terminate - void stop(); - } - - private static final class FixedWorkManager implements WorkManager - { - - final AtomicLong permits; - - public FixedWorkManager(long permits) - { - this.permits = new AtomicLong(permits); - } - - @Override - public int takePermits(int count) - { - while (true) - { - long cur = permits.get(); - if (cur == 0) - return -1; - count = (int) Math.min(count, cur); - long next = cur - count; - if (permits.compareAndSet(cur, next)) - return count; - } - } - - @Override - public void stop() - { - permits.getAndSet(0); - } - } - - private static final class ContinuousWorkManager implements WorkManager - { - - volatile boolean stop = false; - - @Override - public int takePermits(int count) - { - if (stop) - return -1; - return count; - } - - @Override - public void stop() - { - stop = true; - } - - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java index 3a4a4a3..d1cc0d4 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java @@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory; import org.apache.commons.lang3.time.DurationFormatUtils; import org.apache.cassandra.concurrent.NamedThreadFactory; -import org.apache.cassandra.stress.settings.SettingsLog; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JmxCollector; import org.apache.cassandra.stress.util.Timing; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 76642be..1517fcb 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -21,37 +21,26 @@ package org.apache.cassandra.stress; -import com.datastax.driver.core.*; -import com.datastax.driver.core.exceptions.AlreadyExistsException; +import java.io.IOError; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.*; +import java.util.concurrent.TimeUnit; import com.google.common.base.Function; import com.google.common.util.concurrent.Uninterruptibles; + +import com.datastax.driver.core.*; +import com.datastax.driver.core.exceptions.AlreadyExistsException; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement; import org.apache.cassandra.exceptions.RequestValidationException; - import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.stress.generate.Distribution; -import org.apache.cassandra.stress.generate.DistributionFactory; -import org.apache.cassandra.stress.generate.PartitionGenerator; -import org.apache.cassandra.stress.generate.RatioDistributionFactory; -import org.apache.cassandra.stress.generate.SeedManager; -import org.apache.cassandra.stress.generate.values.Booleans; -import org.apache.cassandra.stress.generate.values.Bytes; -import org.apache.cassandra.stress.generate.values.Generator; -import org.apache.cassandra.stress.generate.values.Dates; -import org.apache.cassandra.stress.generate.values.Doubles; -import org.apache.cassandra.stress.generate.values.Floats; -import org.apache.cassandra.stress.generate.values.GeneratorConfig; -import org.apache.cassandra.stress.generate.values.Inets; -import org.apache.cassandra.stress.generate.values.Integers; -import org.apache.cassandra.stress.generate.values.Lists; -import org.apache.cassandra.stress.generate.values.Longs; -import org.apache.cassandra.stress.generate.values.Sets; -import org.apache.cassandra.stress.generate.values.Strings; -import org.apache.cassandra.stress.generate.values.TimeUUIDs; -import org.apache.cassandra.stress.generate.values.UUIDs; +import org.apache.cassandra.stress.generate.*; +import org.apache.cassandra.stress.generate.values.*; import org.apache.cassandra.stress.operations.userdefined.SchemaInsert; import org.apache.cassandra.stress.operations.userdefined.SchemaQuery; import org.apache.cassandra.stress.settings.OptionDistribution; @@ -68,19 +57,6 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.Constructor; import org.yaml.snakeyaml.error.YAMLException; -import java.io.*; -import java.net.URI; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - public class StressProfile implements Serializable { private String keyspaceCql; @@ -247,7 +223,7 @@ public class StressProfile implements Serializable } } - public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, StressSettings settings) + public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator generator, SeedManager seeds, StressSettings settings) { if (queryStatements == null) { @@ -286,10 +262,11 @@ public class StressProfile implements Serializable name = name.toLowerCase(); if (!queryStatements.containsKey(name)) throw new IllegalArgumentException("No query defined with name " + name); - return new SchemaQuery(timer, generator, settings, thriftQueryIds.get(name), queryStatements.get(name), ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name)); + return new SchemaQuery(timer, settings, generator, seeds, thriftQueryIds.get(name), queryStatements.get(name), + ThriftConversion.fromThrift(settings.command.consistencyLevel), ValidationType.NOT_FAIL, argSelects.get(name)); } - public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, StressSettings settings) + public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { if (insertStatement == null) { @@ -401,7 +378,7 @@ public class StressProfile implements Serializable } } - return new SchemaInsert(timer, generator, settings, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); + return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); } private static <E> E select(E first, String key, String defValue, Map<String, String> map, Function<String, E> builder) @@ -415,7 +392,7 @@ public class StressProfile implements Serializable return builder.apply(defValue); } - public PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds) + public PartitionGenerator newGenerator(StressSettings settings) { if (generatorFactory == null) { @@ -427,7 +404,7 @@ public class StressProfile implements Serializable } } - return generatorFactory.newGenerator(settings, seeds); + return generatorFactory.newGenerator(settings); } private class GeneratorFactory @@ -449,9 +426,9 @@ public class StressProfile implements Serializable valueColumns.add(new ColumnInfo(metadata.getName(), metadata.getType(), columnConfigs.get(metadata.getName()))); } - PartitionGenerator newGenerator(StressSettings settings, SeedManager seeds) + PartitionGenerator newGenerator(StressSettings settings) { - return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order, seeds); + return new PartitionGenerator(get(partitionKeys), get(clusteringColumns), get(valueColumns), settings.generate.order); } List<Generator> get(List<ColumnInfo> columnInfos) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressServer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java index 3c9e2a6..a6dfaf4 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java @@ -24,9 +24,10 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; -import org.apache.cassandra.stress.settings.StressSettings; import org.apache.commons.cli.*; +import org.apache.cassandra.stress.settings.StressSettings; + public class StressServer { private static final Options availableOptions = new Options(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java deleted file mode 100644 index 66f8c1d..0000000 --- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java +++ /dev/null @@ -1,554 +0,0 @@ -package org.apache.cassandra.stress.generate; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Deque; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; - -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.stress.generate.values.Generator; - -// a partition is re-used to reduce garbage generation, as is its internal RowIterator -// TODO: we should batch the generation of clustering components so we can bound the time and size necessary to -// generate huge partitions with only a small number of clustering components; i.e. we should generate seeds for batches -// of a single component, and then generate the values within those batches as necessary. this will be difficult with -// generating sorted partitions, and may require generator support (e.g. we may need to support generating prefixes -// that are extended/suffixed to generate each batch, so that we can sort the prefixes) -public class Partition -{ - - private long idseed; - private Seed seed; - private final Object[] partitionKey; - private final PartitionGenerator generator; - private final RowIterator iterator; - - public Partition(PartitionGenerator generator) - { - this.generator = generator; - this.partitionKey = new Object[generator.partitionKey.size()]; - if (generator.clusteringComponents.size() > 0) - iterator = new MultiRowIterator(); - else - iterator = new SingleRowIterator(); - } - - void setSeed(Seed seed) - { - long idseed = 0; - for (int i = 0 ; i < partitionKey.length ; i++) - { - Generator generator = this.generator.partitionKey.get(i); - // set the partition key seed based on the current work item we're processing - generator.setSeed(seed.seed); - Object key = generator.generate(); - partitionKey[i] = key; - // then contribute this value to the data seed - idseed = seed(key, generator.type, idseed); - } - this.seed = seed; - this.idseed = idseed; - } - - public RowIterator iterator(double useChance, boolean isWrite) - { - iterator.reset(useChance, 0, 1, isWrite); - return iterator; - } - - public RowIterator iterator(int targetCount, boolean isWrite) - { - iterator.reset(Double.NaN, targetCount, 1, isWrite); - return iterator; - } - - class SingleRowIterator extends RowIterator - { - boolean done; - - void reset(double useChance, int targetCount, int batches, boolean isWrite) - { - done = false; - } - - public Iterable<Row> next() - { - if (done) - return Collections.emptyList(); - for (int i = 0 ; i < row.row.length ; i++) - { - Generator gen = generator.valueComponents.get(i); - gen.setSeed(idseed); - row.row[i] = gen.generate(); - } - done = true; - return Collections.singleton(row); - } - - public boolean done() - { - return done; - } - - public void markWriteFinished() - { - assert done; - generator.seeds.markFinished(seed); - } - } - - public abstract class RowIterator - { - // we reuse the row object to save garbage - final Row row = new Row(partitionKey, new Object[generator.clusteringComponents.size() + generator.valueComponents.size()]); - - public abstract Iterable<Row> next(); - public abstract boolean done(); - public abstract void markWriteFinished(); - abstract void reset(double useChance, int targetCount, int batches, boolean isWrite); - - public Partition partition() - { - return Partition.this; - } - } - - // permits iterating a random subset of the procedurally generated rows in this partition. this is the only mechanism for visiting rows. - // we maintain a stack of clustering components and their seeds; for each clustering component we visit, we generate all values it takes at that level, - // and then, using the average (total) number of children it takes we randomly choose whether or not we visit its children; - // if we do, we generate all possible values the immediate children can take, and repeat the process. So at any one time we are using space proportional - // to C.N, where N is the average number of values each clustering component takes, as opposed to N^C total values in the partition. - // TODO : guarantee at least one row is always returned - // TODO : support first/last row, and constraining reads to rows we know are populated - class MultiRowIterator extends RowIterator - { - - // probability any single row will be generated in this iteration - double useChance; - - // the seed used to generate the current values for the clustering components at each depth; - // used to save recalculating it for each row, so we only need to recalc from prior row. - final long[] clusteringSeeds = new long[generator.clusteringComponents.size()]; - // the components remaining to be visited for each level of the current stack - final Deque<Object>[] clusteringComponents = new ArrayDeque[generator.clusteringComponents.size()]; - - // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level - // so that we know with what chance we reached there, and we adjust our roll at that level by that amount - final double[] chancemodifier = new double[generator.clusteringComponents.size()]; - final double[] rollmodifier = new double[generator.clusteringComponents.size()]; - - // track where in the partition we are, and where we are limited to - final int[] position = new int[generator.clusteringComponents.size()]; - final int[] limit = new int[position.length]; - int batchSize; - boolean returnedOne; - boolean forceReturnOne; - - // reusable collections for generating unique and sorted clustering components - final Set<Object> unique = new HashSet<>(); - final List<Comparable> tosort = new ArrayList<>(); - final Random random = new Random(); - - MultiRowIterator() - { - for (int i = 0 ; i < clusteringComponents.length ; i++) - clusteringComponents[i] = new ArrayDeque<>(); - rollmodifier[0] = 1f; - chancemodifier[0] = generator.clusteringChildAverages[0]; - } - - // if we're a write, the expected behaviour is that the requested batch count is compounded with the seed's visit - // count to decide how much we should return in one iteration - void reset(double useChance, int targetCount, int batches, boolean isWrite) - { - if (this.useChance < 1d) - { - // we clear our prior roll-modifiers if the use chance was previously less-than zero - Arrays.fill(rollmodifier, 1d); - Arrays.fill(chancemodifier, 1d); - } - - // set the seed for the first clustering component - generator.clusteringComponents.get(0).setSeed(idseed); - int[] position = seed.position; - - // calculate how many first clustering components we'll generate, and how many total rows this predicts - int firstComponentCount = (int) generator.clusteringComponents.get(0).clusteringDistribution.next(); - int expectedRowCount; - - if (!isWrite && position != null) - { - expectedRowCount = 0; - for (int i = 0 ; i < position.length ; i++) - { - expectedRowCount += position[i] * generator.clusteringChildAverages[i]; - limit[i] = position[i]; - } - } - else - { - expectedRowCount = firstComponentCount * generator.clusteringChildAverages[0]; - if (isWrite) - batches *= seed.visits; - Arrays.fill(limit, Integer.MAX_VALUE); - } - - batchSize = Math.max(1, expectedRowCount / batches); - if (Double.isNaN(useChance)) - useChance = Math.max(0d, Math.min(1d, targetCount / (double) expectedRowCount)); - - // clear any remnants of the last iteration, wire up our constants, and fill in the first clustering components - this.useChance = useChance; - this.returnedOne = false; - for (Queue<?> q : clusteringComponents) - q.clear(); - clusteringSeeds[0] = idseed; - fill(clusteringComponents[0], firstComponentCount, generator.clusteringComponents.get(0)); - - // seek to our start position - seek(isWrite ? position : null); - } - - // generate the clustering components for the provided depth; requires preceding components - // to have been generated and their seeds populated into clusteringSeeds - void fill(int depth) - { - long seed = clusteringSeeds[depth - 1]; - Generator gen = generator.clusteringComponents.get(depth); - gen.setSeed(seed); - clusteringSeeds[depth] = seed(clusteringComponents[depth - 1].peek(), generator.clusteringComponents.get(depth - 1).type, seed); - fill(clusteringComponents[depth], (int) gen.clusteringDistribution.next(), gen); - } - - // generate the clustering components into the queue - void fill(Queue<Object> queue, int count, Generator generator) - { - if (count == 1) - { - queue.add(generator.generate()); - return; - } - - switch (Partition.this.generator.order) - { - case SORTED: - if (Comparable.class.isAssignableFrom(generator.clazz)) - { - tosort.clear(); - for (int i = 0 ; i < count ; i++) - tosort.add((Comparable) generator.generate()); - Collections.sort(tosort); - for (int i = 0 ; i < count ; i++) - queue.add(tosort.get(i)); - break; - } - else - { - throw new RuntimeException("Generator class is not comparable: "+generator.clazz); - } - case ARBITRARY: - unique.clear(); - for (int i = 0 ; i < count ; i++) - { - Object next = generator.generate(); - if (unique.add(next)) - queue.add(next); - } - break; - case SHUFFLED: - unique.clear(); - tosort.clear(); - for (int i = 0 ; i < count ; i++) - { - Object next = generator.generate(); - if (unique.add(next)) - tosort.add(new RandomOrder(next)); - } - Collections.sort(tosort); - for (Object o : tosort) - queue.add(((RandomOrder)o).value); - break; - default: - throw new IllegalStateException(); - } - } - - // seek to the provided position (or the first entry if null) - private void seek(int[] position) - { - if (position == null) - { - this.position[0] = -1; - clusteringComponents[0].addFirst(this); - advance(0); - return; - } - - assert position.length == clusteringComponents.length; - for (int i = 0 ; i < position.length ; i++) - { - if (i != 0) - fill(i); - for (int c = position[i] ; c > 0 ; c--) - clusteringComponents[i].poll(); - row.row[i] = clusteringComponents[i].peek(); - } - System.arraycopy(position, 0, this.position, 0, position.length); - } - - // normal method for moving the iterator forward; maintains the row object, and delegates to advance(int) - // to move the iterator to the next item - void advance() - { - // we are always at the leaf level when this method is invoked - // so we calculate the seed for generating the row by combining the seed that generated the clustering components - int depth = clusteringComponents.length - 1; - long parentSeed = clusteringSeeds[depth]; - long rowSeed = seed(clusteringComponents[depth].peek(), generator.clusteringComponents.get(depth).type, parentSeed); - - // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver - for (int i = clusteringSeeds.length ; i < row.row.length ; i++) - { - Generator gen = generator.valueComponents.get(i - clusteringSeeds.length); - gen.setSeed(rowSeed); - row.row[i] = gen.generate(); - } - returnedOne = true; - forceReturnOne = false; - - // then we advance the leaf level - advance(depth); - } - - private void advance(int depth) - { - // advance the leaf component - clusteringComponents[depth].poll(); - position[depth]++; - while (true) - { - if (clusteringComponents[depth].isEmpty()) - { - // if we've run out of clustering components at this level, ascend - if (depth == 0) - return; - depth--; - clusteringComponents[depth].poll(); - position[depth]++; - continue; - } - - if (depth == 0 && !returnedOne && clusteringComponents[0].size() == 1) - forceReturnOne = true; - - // the chance of descending is the uniform usechance, multiplied by the number of children - // we would on average generate (so if we have a 0.1 use chance, but should generate 10 children - // then we will always descend), multiplied by 1/(compound roll), where (compound roll) is the - // chance with which we reached this depth, i.e. if we already beat 50/50 odds, we double our - // chance of beating this next roll - double thischance = useChance * chancemodifier[depth]; - if (forceReturnOne || thischance > 0.999f || thischance >= random.nextDouble()) - { - // if we're descending, we fill in our clustering component and increase our depth - row.row[depth] = clusteringComponents[depth].peek(); - depth++; - if (depth == clusteringComponents.length) - break; - // if we haven't reached the leaf, we update our probability statistics, fill in all of - // this level's clustering components, and repeat - if (useChance < 1d) - { - rollmodifier[depth] = rollmodifier[depth - 1] / Math.min(1d, thischance); - chancemodifier[depth] = generator.clusteringChildAverages[depth] * rollmodifier[depth]; - } - position[depth] = 0; - fill(depth); - continue; - } - - // if we don't descend, we remove the clustering suffix we've skipped and continue - clusteringComponents[depth].poll(); - position[depth]++; - } - } - - public Iterable<Row> next() - { - final int[] limit = position.clone(); - int remainingSize = batchSize; - for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++) - { - limit[i] += remainingSize / generator.clusteringChildAverages[i]; - remainingSize %= generator.clusteringChildAverages[i]; - } - assert remainingSize == 0; - for (int i = limit.length - 1 ; i > 0 ; i--) - { - if (limit[i] > generator.clusteringChildAverages[i]) - { - limit[i - 1] += limit[i] / generator.clusteringChildAverages[i]; - limit[i] %= generator.clusteringChildAverages[i]; - } - } - for (int i = 0 ; i < limit.length ; i++) - { - if (limit[i] < this.limit[i]) - break; - limit[i] = Math.min(limit[i], this.limit[i]); - } - return new Iterable<Row>() - { - public Iterator<Row> iterator() - { - return new Iterator<Row>() - { - - public boolean hasNext() - { - if (done()) - return false; - for (int i = 0 ; i < position.length ; i++) - if (position[i] < limit[i]) - return true; - return false; - } - - public Row next() - { - advance(); - return row; - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - }; - } - - public boolean done() - { - return clusteringComponents[0].isEmpty(); - } - - public void markWriteFinished() - { - if (done()) - generator.seeds.markFinished(seed); - else - generator.seeds.markVisited(seed, position.clone()); - } - - public Partition partition() - { - return Partition.this; - } - } - - private static class RandomOrder implements Comparable<RandomOrder> - { - final int order = ThreadLocalRandom.current().nextInt(); - final Object value; - private RandomOrder(Object value) - { - this.value = value; - } - - public int compareTo(RandomOrder that) - { - return Integer.compare(this.order, that.order); - } - } - - // calculate a new seed based on the combination of a parent seed and the generated child, to generate - // any children of this child - static long seed(Object object, AbstractType type, long seed) - { - if (object instanceof ByteBuffer) - { - ByteBuffer buf = (ByteBuffer) object; - for (int i = buf.position() ; i < buf.limit() ; i++) - seed = (31 * seed) + buf.get(i); - return seed; - } - else if (object instanceof String) - { - String str = (String) object; - for (int i = 0 ; i < str.length() ; i++) - seed = (31 * seed) + str.charAt(i); - return seed; - } - else if (object instanceof Number) - { - return (seed * 31) + ((Number) object).longValue(); - } - else if (object instanceof UUID) - { - return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ ((UUID) object).getMostSignificantBits()); - } - else - { - return seed(type.decompose(object), BytesType.instance, seed); - } - } - - public Object getPartitionKey(int i) - { - return partitionKey[i]; - } - - public String getKeyAsString() - { - StringBuilder sb = new StringBuilder(); - int i = 0; - for (Object key : partitionKey) - { - if (i > 0) - sb.append("|"); - AbstractType type = generator.partitionKey.get(i++).type; - sb.append(type.getString(type.decompose(key))); - } - return sb.toString(); - } - - // used for thrift smart routing - if it's a multi-part key we don't try to route correctly right now - public ByteBuffer getToken() - { - return generator.partitionKey.get(0).type.decompose(partitionKey[0]); - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java index 128d2f5..9f88068 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java @@ -22,7 +22,6 @@ package org.apache.cassandra.stress.generate; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,7 +29,6 @@ import java.util.NoSuchElementException; import com.google.common.collect.Iterables; -import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.values.Generator; public class PartitionGenerator @@ -46,30 +44,24 @@ public class PartitionGenerator final List<Generator> partitionKey; final List<Generator> clusteringComponents; final List<Generator> valueComponents; - final int[] clusteringChildAverages; + final int[] clusteringDescendantAverages; + final int[] clusteringComponentAverages; private final Map<String, Integer> indexMap; final Order order; - final SeedManager seeds; - final List<Partition> recyclable = new ArrayList<>(); - int partitionsInUse = 0; - - public void reset() - { - partitionsInUse = 0; - } - - public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order, SeedManager seeds) + public PartitionGenerator(List<Generator> partitionKey, List<Generator> clusteringComponents, List<Generator> valueComponents, Order order) { this.partitionKey = partitionKey; this.clusteringComponents = clusteringComponents; this.valueComponents = valueComponents; this.order = order; - this.seeds = seeds; - this.clusteringChildAverages = new int[clusteringComponents.size()]; - for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--) - clusteringChildAverages[i] = (int) (i < (clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1); + this.clusteringDescendantAverages = new int[clusteringComponents.size()]; + this.clusteringComponentAverages = new int[clusteringComponents.size()]; + for (int i = 0 ; i < clusteringComponentAverages.length ; i++) + clusteringComponentAverages[i] = (int) clusteringComponents.get(i).clusteringDistribution.average(); + for (int i = clusteringDescendantAverages.length - 1 ; i >= 0 ; i--) + clusteringDescendantAverages[i] = (int) (i < (clusteringDescendantAverages.length - 1) ? clusteringComponentAverages[i + 1] * clusteringDescendantAverages[i + 1] : 1); double maxRowCount = 1d; double minRowCount = 1d; for (Generator component : clusteringComponents) @@ -101,19 +93,6 @@ public class PartitionGenerator return i; } - public Partition generate(Operation op) - { - if (recyclable.size() <= partitionsInUse || recyclable.get(partitionsInUse) == null) - recyclable.add(new Partition(this)); - - Seed seed = seeds.next(op); - if (seed == null) - return null; - Partition partition = recyclable.get(partitionsInUse++); - partition.setSeed(seed); - return partition; - } - public ByteBuffer convert(int c, Object v) { if (c < 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java index f427608..9e2e65b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java @@ -18,50 +18,68 @@ */ package org.apache.cassandra.stress.generate; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.cassandra.stress.util.DynamicList; public class Seed implements Comparable<Seed> { + public final int visits; public final long seed; - final int visits; - DynamicList.Node poolNode; - volatile int[] position; - volatile State state = State.HELD; + private volatile DynamicList.Node poolNode; + private volatile int position; - private static final AtomicReferenceFieldUpdater<Seed, Seed.State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, "state"); + private static final AtomicIntegerFieldUpdater<Seed> positionUpdater = AtomicIntegerFieldUpdater.newUpdater(Seed.class, "position"); public int compareTo(Seed that) { return Long.compare(this.seed, that.seed); } - static enum State - { - HELD, AVAILABLE - } - Seed(long seed, int visits) { this.seed = seed; this.visits = visits; } - boolean take() + public int position() { - return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD); + return position; } - void yield() + public int moveForwards(int rowCount) { - state = State.AVAILABLE; + return positionUpdater.getAndAdd(this, rowCount); } - public int[] position() + public int hashCode() { - return position; + return (int) seed; + } + + public boolean equals(Object that) + { + return that instanceof Seed && this.seed == ((Seed) that).seed; + } + + public boolean save(DynamicList<Seed> sampleFrom, int maxSize) + { + DynamicList.Node poolNode = sampleFrom.append(this, maxSize); + if (poolNode == null) + return false; + this.poolNode = poolNode; + return true; + } + + public boolean isSaved() + { + return poolNode != null; + } + + public void remove(DynamicList<Seed> sampleFrom) + { + sampleFrom.remove(poolNode); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java index dba721d..071d888 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java @@ -33,9 +33,12 @@ public class SeedManager final Distribution visits; final Generator writes; final Generator reads; - final ConcurrentHashMap<Seed, Seed> managing = new ConcurrentHashMap<>(); + final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>(); final DynamicList<Seed> sampleFrom; final Distribution sample; + final long sampleOffset; + final int sampleSize; + final boolean updateSampleImmediately; public SeedManager(StressSettings settings) { @@ -61,10 +64,15 @@ public class SeedManager this.visits = settings.insert.visits.get(); this.writes = writes; this.reads = reads; - this.sample = DistributionInverted.invert(settings.insert.revisit.get()); - if (sample.maxValue() > Integer.MAX_VALUE || sample.minValue() < 0) - throw new IllegalArgumentException(); - this.sampleFrom = new DynamicList<>((int) sample.maxValue()); + Distribution sample = settings.insert.revisit.get(); + this.sampleOffset = Math.min(sample.minValue(), sample.maxValue()); + long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - sampleOffset; + if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE) + throw new IllegalArgumentException("sample range is invalid"); + this.sampleFrom = new DynamicList<>((int) sampleSize); + this.sample = DistributionInverted.invert(sample); + this.sampleSize = (int) sampleSize; + this.updateSampleImmediately = visits.average() > 1; } public Seed next(Operation op) @@ -80,48 +88,38 @@ public class SeedManager while (true) { - int index = (int) sample.next(); + int index = (int) (sample.next() - sampleOffset); Seed seed = sampleFrom.get(index); - if (seed != null && seed.take()) + if (seed != null && seed.isSaved()) return seed; seed = writes.next((int) visits.next()); if (seed == null) return null; - // seeds are created HELD, so if we insert it successfully we have it exclusively for our write - if (managing.putIfAbsent(seed, seed) == null) - return seed; + if (managing.putIfAbsent(seed.seed, seed) == null) + { + if (!updateSampleImmediately || seed.save(sampleFrom, sampleSize)) + return seed; + managing.remove(seed.seed, seed); + } } } - public void markVisited(Seed seed, int[] position) - { - boolean first = seed.position == null; - seed.position = position; - finishedWriting(seed, first, false); - } - - public void markFinished(Seed seed) + public void markLastWrite(Seed seed, boolean first) { - finishedWriting(seed, seed.position == null, true); + // we could have multiple iterators mark the last write simultaneously, + // so we ensure we remove conditionally, and only remove the exact seed we were operating over + // this is important because, to ensure correctness, we do not support calling remove multiple + // times on the same DynamicList.Node + if (managing.remove(seed.seed, seed) && !first) + seed.remove(sampleFrom); } - void finishedWriting(Seed seed, boolean first, boolean completed) + public void markFirstWrite(Seed seed, boolean last) { - if (!completed) - { - if (first) - seed.poolNode = sampleFrom.append(seed); - seed.yield(); - } - else - { - if (!first) - sampleFrom.remove(seed.poolNode); - managing.remove(seed); - } - if (first) - writes.finishWrite(seed); + if (!last && !updateSampleImmediately) + seed.save(sampleFrom, Integer.MAX_VALUE); + writes.finishWrite(seed); } private abstract class Generator http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java index 358163c..3c15c87 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java @@ -20,12 +20,11 @@ */ package org.apache.cassandra.stress.generate.values; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.stress.generate.FasterRandom; - import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Random; + +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.stress.generate.FasterRandom; public class Bytes extends Generator<ByteBuffer> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java index 8f7b2ea..3522338 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java @@ -20,17 +20,14 @@ */ package org.apache.cassandra.stress.generate.values; +import java.io.Serializable; +import java.nio.ByteBuffer; + import org.apache.cassandra.stress.generate.Distribution; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MurmurHash; - -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Map; - public class GeneratorConfig implements Serializable { public final long salt; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java index 71aaae6..b58fee2 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java @@ -20,8 +20,6 @@ */ package org.apache.cassandra.stress.generate.values; -import java.util.Random; - import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.stress.generate.FasterRandom; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java index efe4b79..7bfabf5 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java @@ -21,11 +21,11 @@ package org.apache.cassandra.stress.generate.values; +import java.util.UUID; + import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.utils.UUIDGen; -import java.util.UUID; - public class TimeUUIDs extends Generator<UUID> { final Dates dateGen; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java index 3212795..533b630 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java @@ -25,7 +25,6 @@ import org.apache.cassandra.stress.Operation; public class FixedOpDistribution implements OpDistribution { - final Operation operation; public FixedOpDistribution(Operation operation) @@ -37,10 +36,4 @@ public class FixedOpDistribution implements OpDistribution { return operation; } - - public int maxBatchSize() - { - return (int) operation.partitionCount.maxValue(); - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java index bcbd0bf..0fc15a6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java @@ -27,6 +27,5 @@ public interface OpDistribution { Operation next(); - public int maxBatchSize(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java index 0bd64c5..432e991 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java @@ -22,7 +22,6 @@ package org.apache.cassandra.stress.operations; import org.apache.commons.math3.distribution.EnumeratedDistribution; -import org.apache.commons.math3.util.Pair; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.generate.Distribution; @@ -41,14 +40,6 @@ public class SampledOpDistribution implements OpDistribution this.clustering = clustering; } - public int maxBatchSize() - { - int max = 1; - for (Pair<Operation, Double> pair : operations.getPmf()) - max = Math.max(max, (int) pair.getFirst().partitionCount.maxValue()); - return max; - } - public Operation next() { while (remaining == 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java index b7d1ee7..456c821 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.cassandra.stress.generate.Distribution; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.Timer; @@ -36,9 +37,9 @@ public class CqlCounterAdder extends CqlOperation<Integer> { final Distribution counteradd; - public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings) + public CqlCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.COUNTER_WRITE, timer, generator, settings); + super(Command.COUNTER_WRITE, timer, generator, seedManager, settings); this.counteradd = counteradd.get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java index 94c8faf..8c1c65c 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.Timer; @@ -33,9 +34,9 @@ import org.apache.cassandra.stress.util.Timer; public class CqlCounterGetter extends CqlOperation<Integer> { - public CqlCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings) + public CqlCounterGetter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.COUNTER_READ, timer, generator, settings); + super(Command.COUNTER_READ, timer, generator, seedManager, settings); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java index 622eb14..88ee752 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java @@ -26,17 +26,17 @@ import java.util.ArrayList; import java.util.List; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.Timer; -import org.apache.cassandra.utils.UUIDGen; public class CqlInserter extends CqlOperation<Integer> { - public CqlInserter(Timer timer, PartitionGenerator generator, StressSettings settings) + public CqlInserter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.WRITE, timer, generator, settings); + super(Command.WRITE, timer, generator, seedManager, settings); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java index 0264cd1..9cea854 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java @@ -24,13 +24,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import com.google.common.base.Function; + import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; -import com.google.common.base.Function; -import org.apache.cassandra.stress.Operation; -import org.apache.cassandra.stress.StressMetrics; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.ConnectionStyle; import org.apache.cassandra.stress.settings.CqlVersion; @@ -54,9 +54,9 @@ public abstract class CqlOperation<V> extends PredefinedOperation protected abstract String buildQuery(); protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key); - public CqlOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings) + public CqlOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(type, timer, generator, settings); + super(type, timer, generator, seedManager, settings); if (settings.columns.variableColumnCount) throw new IllegalStateException("Variable column counts are not implemented for CQL"); } @@ -168,28 +168,6 @@ public abstract class CqlOperation<V> extends PredefinedOperation } } - // Requires a custom validate() method, but fetches and stores the keys from the result set for further processing - protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]> - { - - protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object queryId, List<Object> params, ByteBuffer key) - { - super(client, query, queryId, KeysHandler.INSTANCE, params, key); - } - - @Override - public int partitionCount() - { - return result.length; - } - - @Override - public int rowCount() - { - return result.length; - } - } - protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java index 3a7f75a..12cc241 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java @@ -22,23 +22,22 @@ package org.apache.cassandra.stress.operations.predefined; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.Timer; -import org.apache.cassandra.utils.ByteBufferUtil; public class CqlReader extends CqlOperation<ByteBuffer[][]> { - public CqlReader(Timer timer, PartitionGenerator generator, StressSettings settings) + public CqlReader(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.READ, timer, generator, settings); + super(Command.READ, timer, generator, seedManager, settings); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java index dba2e51..d5c3edc 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java @@ -20,26 +20,17 @@ package org.apache.cassandra.stress.operations.predefined; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.EnumMap; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import org.apache.cassandra.stress.Operation; -import org.apache.cassandra.stress.StressMetrics; -import org.apache.cassandra.stress.generate.Distribution; -import org.apache.cassandra.stress.generate.DistributionFactory; -import org.apache.cassandra.stress.generate.DistributionFixed; -import org.apache.cassandra.stress.generate.PartitionGenerator; -import org.apache.cassandra.stress.generate.Row; +import org.apache.cassandra.stress.generate.*; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.CqlVersion; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.Timer; -import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; -import org.apache.cassandra.utils.ByteBufferUtil; public abstract class PredefinedOperation extends Operation { @@ -47,13 +38,18 @@ public abstract class PredefinedOperation extends Operation private final Distribution columnCount; private Object cqlCache; - public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings) + public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(timer, generator, settings, new DistributionFixed(1)); + super(timer, settings, spec(generator, seedManager)); this.type = type; this.columnCount = settings.columns.countDistribution.get(); } + private static DataSpec spec(PartitionGenerator generator, SeedManager seedManager) + { + return new DataSpec(generator, seedManager, new DistributionFixed(1), 1); + } + public boolean isCql3() { return settings.mode.cqlVersion == CqlVersion.CQL3; @@ -174,7 +170,7 @@ public abstract class PredefinedOperation extends Operation protected List<ByteBuffer> getColumnValues(ColumnSelection columns) { - Row row = partitions.get(0).iterator(1, false).next().iterator().next(); + Row row = partitions.get(0).next(); ByteBuffer[] r = new ByteBuffer[columns.count()]; int c = 0; if (columns.indices != null) @@ -186,7 +182,7 @@ public abstract class PredefinedOperation extends Operation return Arrays.asList(r); } - public static Operation operation(Command type, Timer timer, PartitionGenerator generator, StressSettings settings, DistributionFactory counteradd) + public static Operation operation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings, DistributionFactory counteradd) { switch (type) { @@ -194,10 +190,10 @@ public abstract class PredefinedOperation extends Operation switch(settings.mode.style) { case THRIFT: - return new ThriftReader(timer, generator, settings); + return new ThriftReader(timer, generator, seedManager, settings); case CQL: case CQL_PREPARED: - return new CqlReader(timer, generator, settings); + return new CqlReader(timer, generator, seedManager, settings); default: throw new UnsupportedOperationException(); } @@ -207,10 +203,10 @@ public abstract class PredefinedOperation extends Operation switch(settings.mode.style) { case THRIFT: - return new ThriftCounterGetter(timer, generator, settings); + return new ThriftCounterGetter(timer, generator, seedManager, settings); case CQL: case CQL_PREPARED: - return new CqlCounterGetter(timer, generator, settings); + return new CqlCounterGetter(timer, generator, seedManager, settings); default: throw new UnsupportedOperationException(); } @@ -220,10 +216,10 @@ public abstract class PredefinedOperation extends Operation switch(settings.mode.style) { case THRIFT: - return new ThriftInserter(timer, generator, settings); + return new ThriftInserter(timer, generator, seedManager, settings); case CQL: case CQL_PREPARED: - return new CqlInserter(timer, generator, settings); + return new CqlInserter(timer, generator, seedManager, settings); default: throw new UnsupportedOperationException(); } @@ -232,10 +228,10 @@ public abstract class PredefinedOperation extends Operation switch(settings.mode.style) { case THRIFT: - return new ThriftCounterAdder(counteradd, timer, generator, settings); + return new ThriftCounterAdder(counteradd, timer, generator, seedManager, settings); case CQL: case CQL_PREPARED: - return new CqlCounterAdder(counteradd, timer, generator, settings); + return new CqlCounterAdder(counteradd, timer, generator, seedManager, settings); default: throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java index 4ee42e9..be34a07 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java @@ -27,19 +27,22 @@ import java.util.Map; import org.apache.cassandra.stress.generate.Distribution; import org.apache.cassandra.stress.generate.DistributionFactory; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.CounterColumn; +import org.apache.cassandra.thrift.Mutation; public class ThriftCounterAdder extends PredefinedOperation { final Distribution counteradd; - public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, StressSettings settings) + public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.COUNTER_WRITE, timer, generator, settings); + super(Command.COUNTER_WRITE, timer, generator, seedManager, settings); this.counteradd = counteradd.get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java index 10c6aab..ca81fe9 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.ThriftClient; @@ -31,9 +32,9 @@ import org.apache.cassandra.thrift.SlicePredicate; public class ThriftCounterGetter extends PredefinedOperation { - public ThriftCounterGetter(Timer timer, PartitionGenerator generator, StressSettings settings) + public ThriftCounterGetter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.COUNTER_READ, timer, generator, settings); + super(Command.COUNTER_READ, timer, generator, seedManager, settings); } public void run(final ThriftClient client) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java index d6adbf9..1827c06 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java @@ -24,22 +24,23 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; -import org.apache.cassandra.thrift.*; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.ColumnOrSuperColumn; +import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; public final class ThriftInserter extends PredefinedOperation { - public ThriftInserter(Timer timer, PartitionGenerator generator, StressSettings settings) + public ThriftInserter(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.WRITE, timer, generator, settings); + super(Command.WRITE, timer, generator, seedManager, settings); } public boolean isWrite() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java index 276d8c5..d77dc6a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java @@ -22,21 +22,20 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.stress.generate.PartitionGenerator; +import org.apache.cassandra.stress.generate.SeedManager; import org.apache.cassandra.stress.settings.Command; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.ThriftClient; import org.apache.cassandra.stress.util.Timer; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SuperColumn; public final class ThriftReader extends PredefinedOperation { - public ThriftReader(Timer timer, PartitionGenerator generator, StressSettings settings) + public ThriftReader(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(Command.READ, timer, generator, settings); + super(Command.READ, timer, generator, seedManager, settings); } public void run(final ThriftClient client) throws IOException