Repository: cassandra Updated Branches: refs/heads/trunk cf54cc52f -> 6d29ed066
New option for cassandra-stress to leave a ratio of columns null Patch by tjake; reviewed by Jim Witschey for (CASSANDRA-9522) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d29ed06 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d29ed06 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d29ed06 Branch: refs/heads/trunk Commit: 6d29ed0662cec6fef2e920ac83b8d075ea47a31e Parents: cf54cc5 Author: T Jake Luciani <[email protected]> Authored: Mon Jun 29 15:51:00 2015 -0400 Committer: T Jake Luciani <[email protected]> Committed: Wed Jul 1 17:01:44 2015 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/stress/Operation.java | 19 ++++--- .../apache/cassandra/stress/StressProfile.java | 7 ++- .../stress/generate/PartitionIterator.java | 55 +++++++++++++++----- .../predefined/PredefinedOperation.java | 6 +-- .../operations/userdefined/SchemaInsert.java | 4 +- .../operations/userdefined/SchemaQuery.java | 2 +- .../userdefined/ValidatingSchemaQuery.java | 2 +- .../stress/settings/SettingsInsert.java | 6 ++- 9 files changed, 71 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c97912..6895395 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522) * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035) * Add algorithmic token allocation (CASSANDRA-7032) * Add nodetool command to replay batchlog (CASSANDRA-9547) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java index 1179f71..4123911 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java @@ -24,6 +24,8 @@ import java.util.List; import com.google.common.util.concurrent.RateLimiter; import org.apache.cassandra.stress.generate.*; +import org.apache.cassandra.stress.settings.OptionDistribution; +import org.apache.cassandra.stress.settings.OptionRatioDistribution; import org.apache.cassandra.stress.settings.SettingsLog; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.JavaDriverClient; @@ -37,6 +39,7 @@ public abstract class Operation public final StressSettings settings; public final Timer timer; protected final DataSpec spec; + private final static RatioDistribution defaultRowPopulationRatio = OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get(); private final List<PartitionIterator> partitionCache = new ArrayList<>(); protected List<PartitionIterator> partitions; @@ -47,22 +50,24 @@ public abstract class Operation final SeedManager seedManager; final Distribution partitionCount; final RatioDistribution useRatio; + final RatioDistribution rowPopulationRatio; final Integer targetCount; - public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, Integer targetCount) + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, Integer targetCount) { - this(partitionGenerator, seedManager, partitionCount, null, targetCount); + this(partitionGenerator, seedManager, partitionCount, null, rowPopulationRatio, targetCount); } - public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio) + public DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio) { - this(partitionGenerator, seedManager, partitionCount, useRatio, null); + this(partitionGenerator, seedManager, partitionCount, useRatio, rowPopulationRatio, null); } - private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, Integer targetCount) + private DataSpec(PartitionGenerator partitionGenerator, SeedManager seedManager, Distribution partitionCount, RatioDistribution useRatio, RatioDistribution rowPopulationRatio, Integer targetCount) { this.partitionGenerator = partitionGenerator; this.seedManager = seedManager; this.partitionCount = partitionCount; this.useRatio = useRatio; + this.rowPopulationRatio = rowPopulationRatio == null ? defaultRowPopulationRatio : rowPopulationRatio; this.targetCount = targetCount; } } @@ -119,9 +124,9 @@ public abstract class Operation protected boolean reset(Seed seed, PartitionIterator iterator) { if (spec.useRatio == null) - return iterator.reset(seed, spec.targetCount, isWrite()); + return iterator.reset(seed, spec.targetCount, spec.rowPopulationRatio.next(), isWrite()); else - return iterator.reset(seed, spec.useRatio.next(), isWrite()); + return iterator.reset(seed, spec.useRatio.next(), spec.rowPopulationRatio.next(), isWrite()); } public boolean isWrite() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/StressProfile.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java index 272f3c7..e2b9575 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java @@ -74,6 +74,7 @@ public class StressProfile implements Serializable transient volatile BatchStatement.Type batchType; transient volatile DistributionFactory partitions; transient volatile RatioDistributionFactory selectchance; + transient volatile RatioDistributionFactory rowPopulation; transient volatile PreparedStatement insertStatement; transient volatile Integer thriftInsertId; transient volatile List<ValidatingSchemaQuery.Factory> validationFactories; @@ -168,7 +169,7 @@ public class StressProfile implements Serializable } } - client.execute("use "+keyspaceName, org.apache.cassandra.db.ConsistencyLevel.ONE); + client.execute("use " + keyspaceName, org.apache.cassandra.db.ConsistencyLevel.ONE); if (tableCql != null) { @@ -348,6 +349,7 @@ public class StressProfile implements Serializable partitions = select(settings.insert.batchsize, "partitions", "fixed(1)", insert, OptionDistribution.BUILDER); selectchance = select(settings.insert.selectRatio, "select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER); + rowPopulation = select(settings.insert.rowPopulationRatio, "row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER); batchType = settings.insert.batchType != null ? settings.insert.batchType : !insert.containsKey("batchtype") @@ -398,7 +400,7 @@ public class StressProfile implements Serializable } } - return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); + return new SchemaInsert(timer, settings, generator, seedManager, partitions.get(), selectchance.get(), rowPopulation.get(), thriftInsertId, insertStatement, ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType); } public List<ValidatingSchemaQuery> getValidate(Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) @@ -429,6 +431,7 @@ public class StressProfile implements Serializable return first; if (val != null && val.trim().length() > 0) return builder.apply(val); + return builder.apply(defValue); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java index 4906b95..2157214 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java @@ -52,7 +52,7 @@ import org.apache.cassandra.utils.Pair; public abstract class PartitionIterator implements Iterator<Row> { - abstract boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order); + abstract boolean reset(double useChance, double rowPopulationRatio, int targetCount, boolean isWrite, PartitionGenerator.Order order); // picks random (inclusive) bounds to iterate, and returns them public abstract Pair<Row, Row> resetToBounds(Seed seed, int clusteringComponentDepth); @@ -100,42 +100,47 @@ public abstract class PartitionIterator implements Iterator<Row> this.idseed = idseed; } - public boolean reset(Seed seed, double useChance, boolean isWrite) + public boolean reset(Seed seed, double useChance, double rowPopulationRatio, boolean isWrite) { setSeed(seed); this.order = generator.order; - return reset(useChance, 0, isWrite, PartitionIterator.this.order); + return reset(useChance, rowPopulationRatio, 0, isWrite, PartitionIterator.this.order); } - public boolean reset(Seed seed, int targetCount, boolean isWrite) + public boolean reset(Seed seed, int targetCount, double rowPopulationRatio, boolean isWrite) { setSeed(seed); this.order = generator.order; - return reset(Double.NaN, targetCount, isWrite, PartitionIterator.this.order); + return reset(Double.NaN, rowPopulationRatio,targetCount, isWrite,PartitionIterator.this.order); } static class SingleRowIterator extends PartitionIterator { boolean done; boolean isWrite; + double rowPopulationRatio; + final double totalValueColumns; private SingleRowIterator(PartitionGenerator generator, SeedManager seedManager) { super(generator, seedManager); + + this.totalValueColumns = generator.valueComponents.size(); } public Pair<Row, Row> resetToBounds(Seed seed, int clusteringComponentDepth) { assert clusteringComponentDepth == 0; setSeed(seed); - reset(1d, 1, false, PartitionGenerator.Order.SORTED); + reset(1d, 1d, 1, false, PartitionGenerator.Order.SORTED); return Pair.create(new Row(partitionKey), new Row(partitionKey)); } - boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order) + boolean reset(double useChance, double rowPopulationRatio, int targetCount, boolean isWrite, PartitionGenerator.Order order) { done = false; this.isWrite = isWrite; + this.rowPopulationRatio = rowPopulationRatio; return true; } @@ -148,11 +153,20 @@ public abstract class PartitionIterator implements Iterator<Row> { if (done) throw new NoSuchElementException(); + + double valueColumn = 0.0; for (int i = 0 ; i < row.row.length ; i++) { - Generator gen = generator.valueComponents.get(i); - gen.setSeed(idseed); - row.row[i] = gen.generate(); + if (generator.permitNulls(i) && (++valueColumn/totalValueColumns) > rowPopulationRatio) + { + row.row[i] = null; + } + else + { + Generator gen = generator.valueComponents.get(i); + gen.setSeed(idseed); + row.row[i] = gen.generate(); + } } done = true; if (isWrite) @@ -180,6 +194,8 @@ public abstract class PartitionIterator implements Iterator<Row> // probability any single row will be generated in this iteration double useChance; + double rowPopulationRatio; + final double totalValueColumns; // we want our chance of selection to be applied uniformly, so we compound the roll we make at each level // so that we know with what chance we reached there, and we adjust our roll at that level by that amount final double[] chancemodifier = new double[generator.clusteringComponents.size()]; @@ -201,6 +217,7 @@ public abstract class PartitionIterator implements Iterator<Row> clusteringComponents[i] = new ArrayDeque<>(); rollmodifier[0] = 1f; chancemodifier[0] = generator.clusteringDescendantAverages[0]; + this.totalValueColumns = generator.valueComponents.size(); } /** @@ -216,9 +233,10 @@ public abstract class PartitionIterator implements Iterator<Row> * * @return true if there is data to return, false otherwise */ - boolean reset(double useChance, int targetCount, boolean isWrite, PartitionGenerator.Order order) + boolean reset(double useChance, double rowPopulationRatio, int targetCount, boolean isWrite, PartitionGenerator.Order order) { this.isWrite = isWrite; + this.rowPopulationRatio = rowPopulationRatio; this.order = order; // set the seed for the first clustering component @@ -291,7 +309,7 @@ public abstract class PartitionIterator implements Iterator<Row> setUseChance(1d); if (clusteringComponentDepth == 0) { - reset(1d, -1, false, PartitionGenerator.Order.SORTED); + reset(1d, 1d, -1, false, PartitionGenerator.Order.SORTED); return Pair.create(new Row(partitionKey), new Row(partitionKey)); } @@ -493,11 +511,20 @@ public abstract class PartitionIterator implements Iterator<Row> Row result = row.copy(); // and then fill the row with the _non-clustering_ values for the position we _were_ at, as this is what we'll deliver + double valueColumn = 0.0; + for (int i = clusteringSeeds.length ; i < row.row.length ; i++) { Generator gen = generator.valueComponents.get(i - clusteringSeeds.length); - gen.setSeed(rowSeed); - result.row[i] = gen.generate(); + if (++valueColumn / totalValueColumns > rowPopulationRatio) + { + result.row[i] = null; + } + else + { + gen.setSeed(rowSeed); + result.row[i] = gen.generate(); + } } // then we advance the leaf level http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java index 89298ab..66f232a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java @@ -40,14 +40,14 @@ public abstract class PredefinedOperation extends Operation public PredefinedOperation(Command type, Timer timer, PartitionGenerator generator, SeedManager seedManager, StressSettings settings) { - super(timer, settings, spec(generator, seedManager)); + super(timer, settings, spec(generator, seedManager, settings.insert.rowPopulationRatio.get())); this.type = type; this.columnCount = settings.columns.countDistribution.get(); } - private static DataSpec spec(PartitionGenerator generator, SeedManager seedManager) + private static DataSpec spec(PartitionGenerator generator, SeedManager seedManager, RatioDistribution rowPopulationCount) { - return new DataSpec(generator, seedManager, new DistributionFixed(1), 1); + return new DataSpec(generator, seedManager, new DistributionFixed(1), rowPopulationCount, 1); } public boolean isCql3() http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java index ef4d53f..d9fcac8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java @@ -41,9 +41,9 @@ public class SchemaInsert extends SchemaStatement private final BatchStatement.Type batchType; - public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, RatioDistribution useRatio, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType) + public SchemaInsert(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType) { - super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio), statement, thriftId, cl); + super(timer, settings, new DataSpec(generator, seedManager, batchSize, useRatio, rowPopulation), statement, thriftId, cl); this.batchType = batchType; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java index 58f5307..9b5c4ae 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java @@ -53,7 +53,7 @@ public class SchemaQuery extends SchemaStatement public SchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, Integer thriftId, PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect) { - super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl); + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, cl); this.argSelect = argSelect; randomBuffer = new Object[argumentIndex.length][argumentIndex.length]; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java index 1b10fcf..8bdde51 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java @@ -65,7 +65,7 @@ public class ValidatingSchemaQuery extends Operation private ValidatingSchemaQuery(Timer timer, StressSettings settings, PartitionGenerator generator, SeedManager seedManager, ValidatingStatement[] statements, ConsistencyLevel cl, int clusteringComponents) { - super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), 1)); + super(timer, settings, new DataSpec(generator, seedManager, new DistributionFixed(1), settings.insert.rowPopulationRatio.get(), 1)); this.statements = statements; this.cl = cl; argumentIndex = new int[statements[0].statement.getVariables().size()]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java index a6c298b..ac3fc7c 100644 --- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java +++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java @@ -37,6 +37,7 @@ public class SettingsInsert implements Serializable public final DistributionFactory visits; public final DistributionFactory batchsize; public final RatioDistributionFactory selectRatio; + public final RatioDistributionFactory rowPopulationRatio; public final BatchStatement.Type batchType; private SettingsInsert(InsertOptions options) @@ -45,6 +46,8 @@ public class SettingsInsert implements Serializable this.revisit = options.revisit.get(); this.batchsize = options.partitions.get(); this.selectRatio = options.selectRatio.get(); + this.rowPopulationRatio = options.rowPopulationRatio.get(); + this.batchType = !options.batchType.setByUser() ? null : BatchStatement.Type.valueOf(options.batchType.value()); } @@ -57,11 +60,12 @@ public class SettingsInsert implements Serializable final OptionDistribution partitions = new OptionDistribution("partitions=", null, "The number of partitions to update in a single batch", false); final OptionSimple batchType = new OptionSimple("batchtype=", "unlogged|logged|counter", null, "Specify the type of batch statement (LOGGED, UNLOGGED or COUNTER)", false); final OptionRatioDistribution selectRatio = new OptionRatioDistribution("select-ratio=", null, "The uniform probability of visiting any CQL row in the generated partition", false); + final OptionRatioDistribution rowPopulationRatio = new OptionRatioDistribution("row-population-ratio=", null, "The percent of a given rows columns to populate", false); @Override public List<? extends Option> options() { - return Arrays.asList(revisit, visits, partitions, batchType, selectRatio); + return Arrays.asList(revisit, visits, partitions, batchType, selectRatio, rowPopulationRatio); } }
