Repository: cassandra
Updated Branches:
  refs/heads/trunk cf54cc52f -> 6d29ed066


New option for cassandra-stress to leave a ratio of columns null

Patch by tjake; reviewed by Jim Witschey for (CASSANDRA-9522)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d29ed06
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d29ed06
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d29ed06

Branch: refs/heads/trunk
Commit: 6d29ed0662cec6fef2e920ac83b8d075ea47a31e
Parents: cf54cc5
Author: T Jake Luciani <[email protected]>
Authored: Mon Jun 29 15:51:00 2015 -0400
Committer: T Jake Luciani <[email protected]>
Committed: Wed Jul 1 17:01:44 2015 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/stress/Operation.java  | 19 ++++---
 .../apache/cassandra/stress/StressProfile.java  |  7 ++-
 .../stress/generate/PartitionIterator.java      | 55 +++++++++++++++-----
 .../predefined/PredefinedOperation.java         |  6 +--
 .../operations/userdefined/SchemaInsert.java    |  4 +-
 .../operations/userdefined/SchemaQuery.java     |  2 +-
 .../userdefined/ValidatingSchemaQuery.java      |  2 +-
 .../stress/settings/SettingsInsert.java         |  6 ++-
 9 files changed, 71 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c97912..6895395 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * New option for cassandra-stress to leave a ratio of columns null 
(CASSANDRA-9522)
  * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035)
  * Add algorithmic token allocation (CASSANDRA-7032)
  * Add nodetool command to replay batchlog (CASSANDRA-9547)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java 
b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 1179f71..4123911 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -24,6 +24,8 @@ import java.util.List;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.settings.OptionDistribution;
+import org.apache.cassandra.stress.settings.OptionRatioDistribution;
 import org.apache.cassandra.stress.settings.SettingsLog;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
@@ -37,6 +39,7 @@ public abstract class Operation
     public final StressSettings settings;
     public final Timer timer;
     protected final DataSpec spec;
+    private final static RatioDistribution defaultRowPopulationRatio = 
OptionRatioDistribution.BUILDER.apply("fixed(1)/1").get();
 
     private final List<PartitionIterator> partitionCache = new ArrayList<>();
     protected List<PartitionIterator> partitions;
@@ -47,22 +50,24 @@ public abstract class Operation
         final SeedManager seedManager;
         final Distribution partitionCount;
         final RatioDistribution useRatio;
+        final RatioDistribution rowPopulationRatio;
         final Integer targetCount;
 
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, Integer targetCount)
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution rowPopulationRatio, 
Integer targetCount)
         {
-            this(partitionGenerator, seedManager, partitionCount, null, 
targetCount);
+            this(partitionGenerator, seedManager, partitionCount, null, 
rowPopulationRatio, targetCount);
         }
-        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio)
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio, 
RatioDistribution rowPopulationRatio)
         {
-            this(partitionGenerator, seedManager, partitionCount, useRatio, 
null);
+            this(partitionGenerator, seedManager, partitionCount, useRatio, 
rowPopulationRatio, null);
         }
-        private DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio, Integer 
targetCount)
+        private DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio, 
RatioDistribution rowPopulationRatio, Integer targetCount)
         {
             this.partitionGenerator = partitionGenerator;
             this.seedManager = seedManager;
             this.partitionCount = partitionCount;
             this.useRatio = useRatio;
+            this.rowPopulationRatio = rowPopulationRatio == null ? 
defaultRowPopulationRatio : rowPopulationRatio;
             this.targetCount = targetCount;
         }
     }
@@ -119,9 +124,9 @@ public abstract class Operation
     protected boolean reset(Seed seed, PartitionIterator iterator)
     {
         if (spec.useRatio == null)
-            return iterator.reset(seed, spec.targetCount, isWrite());
+            return iterator.reset(seed, spec.targetCount, 
spec.rowPopulationRatio.next(), isWrite());
         else
-            return iterator.reset(seed, spec.useRatio.next(), isWrite());
+            return iterator.reset(seed, spec.useRatio.next(), 
spec.rowPopulationRatio.next(), isWrite());
     }
 
     public boolean isWrite()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 272f3c7..e2b9575 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -74,6 +74,7 @@ public class StressProfile implements Serializable
     transient volatile BatchStatement.Type batchType;
     transient volatile DistributionFactory partitions;
     transient volatile RatioDistributionFactory selectchance;
+    transient volatile RatioDistributionFactory rowPopulation;
     transient volatile PreparedStatement insertStatement;
     transient volatile Integer thriftInsertId;
     transient volatile List<ValidatingSchemaQuery.Factory> validationFactories;
@@ -168,7 +169,7 @@ public class StressProfile implements Serializable
             }
         }
 
-        client.execute("use "+keyspaceName, 
org.apache.cassandra.db.ConsistencyLevel.ONE);
+        client.execute("use " + keyspaceName, 
org.apache.cassandra.db.ConsistencyLevel.ONE);
 
         if (tableCql != null)
         {
@@ -348,6 +349,7 @@ public class StressProfile implements Serializable
 
                     partitions = select(settings.insert.batchsize, 
"partitions", "fixed(1)", insert, OptionDistribution.BUILDER);
                     selectchance = select(settings.insert.selectRatio, 
"select", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
+                    rowPopulation = select(settings.insert.rowPopulationRatio, 
"row-population", "fixed(1)/1", insert, OptionRatioDistribution.BUILDER);
                     batchType = settings.insert.batchType != null
                                 ? settings.insert.batchType
                                 : !insert.containsKey("batchtype")
@@ -398,7 +400,7 @@ public class StressProfile implements Serializable
             }
         }
 
-        return new SchemaInsert(timer, settings, generator, seedManager, 
partitions.get(), selectchance.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+        return new SchemaInsert(timer, settings, generator, seedManager, 
partitions.get(), selectchance.get(), rowPopulation.get(), thriftInsertId, 
insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
     }
 
     public List<ValidatingSchemaQuery> getValidate(Timer timer, 
PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
@@ -429,6 +431,7 @@ public class StressProfile implements Serializable
             return first;
         if (val != null && val.trim().length() > 0)
             return builder.apply(val);
+        
         return builder.apply(defValue);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
index 4906b95..2157214 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionIterator.java
@@ -52,7 +52,7 @@ import org.apache.cassandra.utils.Pair;
 public abstract class PartitionIterator implements Iterator<Row>
 {
 
-    abstract boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order);
+    abstract boolean reset(double useChance, double rowPopulationRatio, int 
targetCount, boolean isWrite, PartitionGenerator.Order order);
     // picks random (inclusive) bounds to iterate, and returns them
     public abstract Pair<Row, Row> resetToBounds(Seed seed, int 
clusteringComponentDepth);
 
@@ -100,42 +100,47 @@ public abstract class PartitionIterator implements 
Iterator<Row>
         this.idseed = idseed;
     }
 
-    public boolean reset(Seed seed, double useChance, boolean isWrite)
+    public boolean reset(Seed seed, double useChance, double 
rowPopulationRatio, boolean isWrite)
     {
         setSeed(seed);
         this.order = generator.order;
-        return reset(useChance, 0, isWrite, PartitionIterator.this.order);
+        return reset(useChance, rowPopulationRatio, 0, isWrite, 
PartitionIterator.this.order);
     }
 
-    public boolean reset(Seed seed, int targetCount, boolean isWrite)
+    public boolean reset(Seed seed, int targetCount, double 
rowPopulationRatio,  boolean isWrite)
     {
         setSeed(seed);
         this.order = generator.order;
-        return reset(Double.NaN, targetCount, isWrite, 
PartitionIterator.this.order);
+        return reset(Double.NaN, rowPopulationRatio,targetCount, 
isWrite,PartitionIterator.this.order);
     }
 
     static class SingleRowIterator extends PartitionIterator
     {
         boolean done;
         boolean isWrite;
+        double rowPopulationRatio;
+        final double totalValueColumns;
 
         private SingleRowIterator(PartitionGenerator generator, SeedManager 
seedManager)
         {
             super(generator, seedManager);
+
+            this.totalValueColumns = generator.valueComponents.size();
         }
 
         public Pair<Row, Row> resetToBounds(Seed seed, int 
clusteringComponentDepth)
         {
             assert clusteringComponentDepth == 0;
             setSeed(seed);
-            reset(1d, 1, false, PartitionGenerator.Order.SORTED);
+            reset(1d, 1d, 1, false, PartitionGenerator.Order.SORTED);
             return Pair.create(new Row(partitionKey), new Row(partitionKey));
         }
 
-        boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order)
+        boolean reset(double useChance, double rowPopulationRatio, int 
targetCount, boolean isWrite, PartitionGenerator.Order order)
         {
             done = false;
             this.isWrite = isWrite;
+            this.rowPopulationRatio = rowPopulationRatio;
             return true;
         }
 
@@ -148,11 +153,20 @@ public abstract class PartitionIterator implements 
Iterator<Row>
         {
             if (done)
                 throw new NoSuchElementException();
+
+            double valueColumn = 0.0;
             for (int i = 0 ; i < row.row.length ; i++)
             {
-                Generator gen = generator.valueComponents.get(i);
-                gen.setSeed(idseed);
-                row.row[i] = gen.generate();
+                if (generator.permitNulls(i) && 
(++valueColumn/totalValueColumns) > rowPopulationRatio)
+                {
+                    row.row[i] = null;
+                }
+                else
+                {
+                    Generator gen = generator.valueComponents.get(i);
+                    gen.setSeed(idseed);
+                    row.row[i] = gen.generate();
+                }
             }
             done = true;
             if (isWrite)
@@ -180,6 +194,8 @@ public abstract class PartitionIterator implements 
Iterator<Row>
 
         // probability any single row will be generated in this iteration
         double useChance;
+        double rowPopulationRatio;
+        final double totalValueColumns;
         // we want our chance of selection to be applied uniformly, so we 
compound the roll we make at each level
         // so that we know with what chance we reached there, and we adjust 
our roll at that level by that amount
         final double[] chancemodifier = new 
double[generator.clusteringComponents.size()];
@@ -201,6 +217,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
                 clusteringComponents[i] = new ArrayDeque<>();
             rollmodifier[0] = 1f;
             chancemodifier[0] = generator.clusteringDescendantAverages[0];
+            this.totalValueColumns = generator.valueComponents.size();
         }
 
         /**
@@ -216,9 +233,10 @@ public abstract class PartitionIterator implements 
Iterator<Row>
          *
          * @return true if there is data to return, false otherwise
          */
-        boolean reset(double useChance, int targetCount, boolean isWrite, 
PartitionGenerator.Order order)
+        boolean reset(double useChance, double rowPopulationRatio, int 
targetCount, boolean isWrite, PartitionGenerator.Order order)
         {
             this.isWrite = isWrite;
+            this.rowPopulationRatio = rowPopulationRatio;
 
             this.order = order;
             // set the seed for the first clustering component
@@ -291,7 +309,7 @@ public abstract class PartitionIterator implements 
Iterator<Row>
             setUseChance(1d);
             if (clusteringComponentDepth == 0)
             {
-                reset(1d, -1, false, PartitionGenerator.Order.SORTED);
+                reset(1d, 1d, -1, false, PartitionGenerator.Order.SORTED);
                 return Pair.create(new Row(partitionKey), new 
Row(partitionKey));
             }
 
@@ -493,11 +511,20 @@ public abstract class PartitionIterator implements 
Iterator<Row>
 
             Row result = row.copy();
             // and then fill the row with the _non-clustering_ values for the 
position we _were_ at, as this is what we'll deliver
+            double valueColumn = 0.0;
+
             for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
             {
                 Generator gen = generator.valueComponents.get(i - 
clusteringSeeds.length);
-                gen.setSeed(rowSeed);
-                result.row[i] = gen.generate();
+                if (++valueColumn / totalValueColumns > rowPopulationRatio)
+                {
+                    result.row[i] = null;
+                }
+                else
+                {
+                    gen.setSeed(rowSeed);
+                    result.row[i] = gen.generate();
+                }
             }
 
             // then we advance the leaf level

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index 89298ab..66f232a 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -40,14 +40,14 @@ public abstract class PredefinedOperation extends Operation
 
     public PredefinedOperation(Command type, Timer timer, PartitionGenerator 
generator, SeedManager seedManager, StressSettings settings)
     {
-        super(timer, settings, spec(generator, seedManager));
+        super(timer, settings, spec(generator, seedManager, 
settings.insert.rowPopulationRatio.get()));
         this.type = type;
         this.columnCount = settings.columns.countDistribution.get();
     }
 
-    private static DataSpec spec(PartitionGenerator generator, SeedManager 
seedManager)
+    private static DataSpec spec(PartitionGenerator generator, SeedManager 
seedManager, RatioDistribution rowPopulationCount)
     {
-        return new DataSpec(generator, seedManager, new DistributionFixed(1), 
1);
+        return new DataSpec(generator, seedManager, new DistributionFixed(1), 
rowPopulationCount, 1);
     }
 
     public boolean isCql3()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index ef4d53f..d9fcac8 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -41,9 +41,9 @@ public class SchemaInsert extends SchemaStatement
 
     private final BatchStatement.Type batchType;
 
-    public SchemaInsert(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, 
RatioDistribution useRatio, Integer thriftId, PreparedStatement statement, 
ConsistencyLevel cl, BatchStatement.Type batchType)
+    public SchemaInsert(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Distribution batchSize, 
RatioDistribution useRatio, RatioDistribution rowPopulation, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, BatchStatement.Type batchType)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio), statement, thriftId, cl);
+        super(timer, settings, new DataSpec(generator, seedManager, batchSize, 
useRatio, rowPopulation), statement, thriftId, cl);
         this.batchType = batchType;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index 58f5307..9b5c4ae 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -53,7 +53,7 @@ public class SchemaQuery extends SchemaStatement
 
     public SchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, Integer thriftId, 
PreparedStatement statement, ConsistencyLevel cl, ArgSelect argSelect)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), argSelect == ArgSelect.MULTIROW ? 
statement.getVariables().size() : 1), statement, thriftId, cl);
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement, thriftId, 
cl);
         this.argSelect = argSelect;
         randomBuffer = new Object[argumentIndex.length][argumentIndex.length];
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index 1b10fcf..8bdde51 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -65,7 +65,7 @@ public class ValidatingSchemaQuery extends Operation
 
     private ValidatingSchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, ValidatingStatement[] 
statements, ConsistencyLevel cl, int clusteringComponents)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), 1));
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), 1));
         this.statements = statements;
         this.cl = cl;
         argumentIndex = new int[statements[0].statement.getVariables().size()];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d29ed06/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
index a6c298b..ac3fc7c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsInsert.java
@@ -37,6 +37,7 @@ public class SettingsInsert implements Serializable
     public final DistributionFactory visits;
     public final DistributionFactory batchsize;
     public final RatioDistributionFactory selectRatio;
+    public final RatioDistributionFactory rowPopulationRatio;
     public final BatchStatement.Type batchType;
 
     private SettingsInsert(InsertOptions options)
@@ -45,6 +46,8 @@ public class SettingsInsert implements Serializable
         this.revisit = options.revisit.get();
         this.batchsize = options.partitions.get();
         this.selectRatio = options.selectRatio.get();
+        this.rowPopulationRatio = options.rowPopulationRatio.get();
+
         this.batchType = !options.batchType.setByUser() ? null : 
BatchStatement.Type.valueOf(options.batchType.value());
     }
 
@@ -57,11 +60,12 @@ public class SettingsInsert implements Serializable
         final OptionDistribution partitions = new 
OptionDistribution("partitions=", null, "The number of partitions to update in 
a single batch", false);
         final OptionSimple batchType = new OptionSimple("batchtype=", 
"unlogged|logged|counter", null, "Specify the type of batch statement (LOGGED, 
UNLOGGED or COUNTER)", false);
         final OptionRatioDistribution selectRatio = new 
OptionRatioDistribution("select-ratio=", null, "The uniform probability of 
visiting any CQL row in the generated partition", false);
+        final OptionRatioDistribution rowPopulationRatio = new 
OptionRatioDistribution("row-population-ratio=", null, "The percent of a given 
rows columns to populate", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(revisit, visits, partitions, batchType, 
selectRatio);
+            return Arrays.asList(revisit, visits, partitions, batchType, 
selectRatio, rowPopulationRatio);
         }
     }
 

Reply via email to