cassandra-stress simultaneous inserts over same seed

patch by benedict; reviewed by rstupp CASSANDRA-7964


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c579a01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c579a01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c579a01

Branch: refs/heads/trunk
Commit: 6c579a0102fa3e67215fef5d9f8aa97191e3a216
Parents: cdba5aa
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Fri Dec 12 14:09:37 2014 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Fri Dec 12 14:09:37 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/stress/Operation.java  |  86 ++-
 .../apache/cassandra/stress/StressAction.java   |  96 +---
 .../apache/cassandra/stress/StressMetrics.java  |   1 -
 .../apache/cassandra/stress/StressProfile.java  |  65 +--
 .../apache/cassandra/stress/StressServer.java   |   3 +-
 .../cassandra/stress/generate/Partition.java    | 554 -------------------
 .../stress/generate/PartitionGenerator.java     |  39 +-
 .../apache/cassandra/stress/generate/Seed.java  |  52 +-
 .../cassandra/stress/generate/SeedManager.java  |  66 ++-
 .../cassandra/stress/generate/values/Bytes.java |   7 +-
 .../stress/generate/values/GeneratorConfig.java |   9 +-
 .../stress/generate/values/Strings.java         |   2 -
 .../stress/generate/values/TimeUUIDs.java       |   4 +-
 .../stress/operations/FixedOpDistribution.java  |   7 -
 .../stress/operations/OpDistribution.java       |   1 -
 .../operations/SampledOpDistribution.java       |   9 -
 .../operations/predefined/CqlCounterAdder.java  |   5 +-
 .../operations/predefined/CqlCounterGetter.java |   5 +-
 .../operations/predefined/CqlInserter.java      |   6 +-
 .../operations/predefined/CqlOperation.java     |  32 +-
 .../stress/operations/predefined/CqlReader.java |   7 +-
 .../predefined/PredefinedOperation.java         |  40 +-
 .../predefined/ThriftCounterAdder.java          |   9 +-
 .../predefined/ThriftCounterGetter.java         |   5 +-
 .../operations/predefined/ThriftInserter.java   |  11 +-
 .../operations/predefined/ThriftReader.java     |   7 +-
 .../operations/userdefined/SchemaInsert.java    |  52 +-
 .../operations/userdefined/SchemaQuery.java     |  55 +-
 .../operations/userdefined/SchemaStatement.java |  16 +-
 .../cassandra/stress/settings/Command.java      |   5 +-
 .../stress/settings/OptionAnyProbabilities.java |   8 +-
 .../stress/settings/OptionDistribution.java     |   4 +-
 .../settings/OptionEnumProbabilities.java       |   2 +-
 .../cassandra/stress/settings/OptionMulti.java  |   7 +-
 .../settings/OptionRatioDistribution.java       |  15 +-
 .../stress/settings/SettingsColumn.java         |  12 +-
 .../stress/settings/SettingsCommand.java        |   1 -
 .../settings/SettingsCommandPreDefined.java     |   7 +-
 .../SettingsCommandPreDefinedMixed.java         |   6 +-
 .../stress/settings/SettingsCommandUser.java    |  14 +-
 .../stress/settings/SettingsErrors.java         |   3 -
 .../cassandra/stress/settings/SettingsNode.java |   7 +-
 .../stress/settings/SettingsSchema.java         |   1 -
 .../stress/settings/StressSettings.java         |   3 -
 .../cassandra/stress/util/DynamicList.java      |  15 +-
 .../cassandra/stress/util/JavaDriverClient.java |   4 +-
 .../stress/util/SmartThriftClient.java          |   3 +-
 .../org/apache/cassandra/stress/util/Timer.java |   1 -
 .../apache/cassandra/stress/util/Timing.java    |   1 -
 .../cassandra/stress/util/TimingInterval.java   |   1 -
 51 files changed, 323 insertions(+), 1049 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa42e85..25140a8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964)
  * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926)
  * Ensure memtable flush cannot expire commit log entries from its future 
(CASSANDRA-8383)
  * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java 
b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 5560240..edf3a54 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -19,13 +19,13 @@ package org.apache.cassandra.stress;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.Partition;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.settings.*;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.settings.SettingsLog;
+import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
@@ -36,17 +36,42 @@ public abstract class Operation
 {
     public final StressSettings settings;
     public final Timer timer;
-    public final PartitionGenerator generator;
-    public final Distribution partitionCount;
+    protected final DataSpec spec;
+
+    private final List<PartitionIterator> partitionCache = new ArrayList<>();
+    protected List<PartitionIterator> partitions;
 
-    protected List<Partition> partitions;
+    public static final class DataSpec
+    {
+        public final PartitionGenerator partitionGenerator;
+        final SeedManager seedManager;
+        final Distribution partitionCount;
+        final RatioDistribution useRatio;
+        final Integer targetCount;
+
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, Integer targetCount)
+        {
+            this(partitionGenerator, seedManager, partitionCount, null, 
targetCount);
+        }
+        public DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio)
+        {
+            this(partitionGenerator, seedManager, partitionCount, useRatio, 
null);
+        }
+        private DataSpec(PartitionGenerator partitionGenerator, SeedManager 
seedManager, Distribution partitionCount, RatioDistribution useRatio, Integer 
targetCount)
+        {
+            this.partitionGenerator = partitionGenerator;
+            this.seedManager = seedManager;
+            this.partitionCount = partitionCount;
+            this.useRatio = useRatio;
+            this.targetCount = targetCount;
+        }
+    }
 
-    public Operation(Timer timer, PartitionGenerator generator, StressSettings 
settings, Distribution partitionCount)
+    public Operation(Timer timer, StressSettings settings, DataSpec spec)
     {
-        this.generator = generator;
         this.timer = timer;
         this.settings = settings;
-        this.partitionCount = partitionCount;
+        this.spec = spec;
     }
 
     public static interface RunOp
@@ -56,9 +81,42 @@ public abstract class Operation
         public int rowCount();
     }
 
-    protected void setPartitions(List<Partition> partitions)
+    boolean ready(WorkManager permits, RateLimiter rateLimiter)
     {
-        this.partitions = partitions;
+        int partitionCount = (int) spec.partitionCount.next();
+        if (partitionCount <= 0)
+            return false;
+        partitionCount = permits.takePermits(partitionCount);
+        if (partitionCount <= 0)
+            return false;
+
+        int i = 0;
+        boolean success = true;
+        for (; i < partitionCount && success ; i++)
+        {
+            if (i >= partitionCache.size())
+                
partitionCache.add(PartitionIterator.get(spec.partitionGenerator, 
spec.seedManager));
+
+            success = false;
+            while (!success)
+            {
+                Seed seed = spec.seedManager.next(this);
+                if (seed == null)
+                    break;
+
+                if (spec.useRatio == null)
+                    success = partitionCache.get(i).reset(seed, 
spec.targetCount, this);
+                else
+                    success = partitionCache.get(i).reset(seed, 
spec.useRatio.next(), this);
+            }
+        }
+        partitionCount = i;
+
+        if (rateLimiter != null)
+            rateLimiter.acquire(partitionCount);
+
+        partitions = partitionCache.subList(0, partitionCount);
+        return !partitions.isEmpty();
     }
 
     public boolean isWrite()
@@ -135,7 +193,7 @@ public abstract class Operation
     private String key()
     {
         List<String> keys = new ArrayList<>();
-        for (Partition partition : partitions)
+        for (PartitionIterator partition : partitions)
             keys.add(partition.getKeyAsString());
         return keys.toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 68e0004..1433742 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -21,19 +21,16 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import org.apache.cassandra.stress.generate.Partition;
 import org.apache.cassandra.stress.operations.OpDistribution;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
-import org.apache.cassandra.stress.settings.*;
+import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
@@ -180,9 +177,9 @@ public class StressAction implements Runnable
                                                            : "until stderr of 
mean < " + settings.command.targetUncertainty));
         final WorkManager workManager;
         if (opCount < 0)
-            workManager = new ContinuousWorkManager();
+            workManager = new WorkManager.ContinuousWorkManager();
         else
-            workManager = new FixedWorkManager(opCount);
+            workManager = new WorkManager.FixedWorkManager(opCount);
 
         final StressMetrics metrics = new StressMetrics(output, 
settings.log.intervalMillis, settings);
 
@@ -285,36 +282,12 @@ public class StressAction implements Runnable
                         throw new IllegalStateException();
                 }
 
-                int maxBatchSize = operations.maxBatchSize();
-                Partition[] partitions = new Partition[maxBatchSize];
                 while (true)
                 {
-
-                    // TODO: Operation should be able to ecapsulate much of 
this behaviour
                     Operation op = operations.next();
-                    op.generator.reset();
-
-                    int batchSize = workManager.takePermits(Math.max(1, (int) 
op.partitionCount.next()));
-                    if (batchSize < 0)
-                        break;
-
-                    if (rateLimiter != null)
-                        rateLimiter.acquire(batchSize);
-
-                    int partitionCount = 0;
-                    while (partitionCount < batchSize)
-                    {
-                        Partition p = op.generator.generate(op);
-                        if (p == null)
-                            break;
-                        partitions[partitionCount++] = p;
-                    }
-
-                    if (partitionCount == 0)
+                    if (!op.ready(workManager, rateLimiter))
                         break;
 
-                    op.setPartitions(Arrays.asList(partitions).subList(0, 
partitionCount));
-
                     try
                     {
                         switch (settings.mode.api)
@@ -358,65 +331,4 @@ public class StressAction implements Runnable
 
     }
 
-    private interface WorkManager
-    {
-        // -1 indicates consumer should terminate
-        int takePermits(int count);
-
-        // signal all consumers to terminate
-        void stop();
-    }
-
-    private static final class FixedWorkManager implements WorkManager
-    {
-
-        final AtomicLong permits;
-
-        public FixedWorkManager(long permits)
-        {
-            this.permits = new AtomicLong(permits);
-        }
-
-        @Override
-        public int takePermits(int count)
-        {
-            while (true)
-            {
-                long cur = permits.get();
-                if (cur == 0)
-                    return -1;
-                count = (int) Math.min(count, cur);
-                long next = cur - count;
-                if (permits.compareAndSet(cur, next))
-                    return count;
-            }
-        }
-
-        @Override
-        public void stop()
-        {
-            permits.getAndSet(0);
-        }
-    }
-
-    private static final class ContinuousWorkManager implements WorkManager
-    {
-
-        volatile boolean stop = false;
-
-        @Override
-        public int takePermits(int count)
-        {
-            if (stop)
-                return -1;
-            return count;
-        }
-
-        @Override
-        public void stop()
-        {
-            stop = true;
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java 
b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 3a4a4a3..d1cc0d4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ThreadFactory;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.stress.settings.SettingsLog;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JmxCollector;
 import org.apache.cassandra.stress.util.Timing;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 76642be..1517fcb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -21,37 +21,26 @@
 package org.apache.cassandra.stress;
 
 
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CreateKeyspaceStatement;
 import org.apache.cassandra.exceptions.RequestValidationException;
-
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.RatioDistributionFactory;
-import org.apache.cassandra.stress.generate.SeedManager;
-import org.apache.cassandra.stress.generate.values.Booleans;
-import org.apache.cassandra.stress.generate.values.Bytes;
-import org.apache.cassandra.stress.generate.values.Generator;
-import org.apache.cassandra.stress.generate.values.Dates;
-import org.apache.cassandra.stress.generate.values.Doubles;
-import org.apache.cassandra.stress.generate.values.Floats;
-import org.apache.cassandra.stress.generate.values.GeneratorConfig;
-import org.apache.cassandra.stress.generate.values.Inets;
-import org.apache.cassandra.stress.generate.values.Integers;
-import org.apache.cassandra.stress.generate.values.Lists;
-import org.apache.cassandra.stress.generate.values.Longs;
-import org.apache.cassandra.stress.generate.values.Sets;
-import org.apache.cassandra.stress.generate.values.Strings;
-import org.apache.cassandra.stress.generate.values.TimeUUIDs;
-import org.apache.cassandra.stress.generate.values.UUIDs;
+import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.generate.values.*;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
 import org.apache.cassandra.stress.settings.OptionDistribution;
@@ -68,19 +57,6 @@ import org.yaml.snakeyaml.Yaml;
 import org.yaml.snakeyaml.constructor.Constructor;
 import org.yaml.snakeyaml.error.YAMLException;
 
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 public class StressProfile implements Serializable
 {
     private String keyspaceCql;
@@ -247,7 +223,7 @@ public class StressProfile implements Serializable
         }
     }
 
-    public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator 
generator, StressSettings settings)
+    public SchemaQuery getQuery(String name, Timer timer, PartitionGenerator 
generator, SeedManager seeds, StressSettings settings)
     {
         if (queryStatements == null)
         {
@@ -286,10 +262,11 @@ public class StressProfile implements Serializable
         name = name.toLowerCase();
         if (!queryStatements.containsKey(name))
             throw new IllegalArgumentException("No query defined with name " + 
name);
-        return new SchemaQuery(timer, generator, settings, 
thriftQueryIds.get(name), queryStatements.get(name), 
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
ValidationType.NOT_FAIL, argSelects.get(name));
+        return new SchemaQuery(timer, settings, generator, seeds, 
thriftQueryIds.get(name), queryStatements.get(name),
+                               
ThriftConversion.fromThrift(settings.command.consistencyLevel), 
ValidationType.NOT_FAIL, argSelects.get(name));
     }
 
-    public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public SchemaInsert getInsert(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
     {
         if (insertStatement == null)
         {
@@ -401,7 +378,7 @@ public class StressProfile implements Serializable
             }
         }
 
-        return new SchemaInsert(timer, generator, settings, partitions.get(), 
selectchance.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
+        return new SchemaInsert(timer, settings, generator, seedManager, 
partitions.get(), selectchance.get(), thriftInsertId, insertStatement, 
ThriftConversion.fromThrift(settings.command.consistencyLevel), batchType);
     }
 
     private static <E> E select(E first, String key, String defValue, 
Map<String, String> map, Function<String, E> builder)
@@ -415,7 +392,7 @@ public class StressProfile implements Serializable
         return builder.apply(defValue);
     }
 
-    public PartitionGenerator newGenerator(StressSettings settings, 
SeedManager seeds)
+    public PartitionGenerator newGenerator(StressSettings settings)
     {
         if (generatorFactory == null)
         {
@@ -427,7 +404,7 @@ public class StressProfile implements Serializable
             }
         }
 
-        return generatorFactory.newGenerator(settings, seeds);
+        return generatorFactory.newGenerator(settings);
     }
 
     private class GeneratorFactory
@@ -449,9 +426,9 @@ public class StressProfile implements Serializable
                     valueColumns.add(new ColumnInfo(metadata.getName(), 
metadata.getType(), columnConfigs.get(metadata.getName())));
         }
 
-        PartitionGenerator newGenerator(StressSettings settings, SeedManager 
seeds)
+        PartitionGenerator newGenerator(StressSettings settings)
         {
-            return new PartitionGenerator(get(partitionKeys), 
get(clusteringColumns), get(valueColumns), settings.generate.order, seeds);
+            return new PartitionGenerator(get(partitionKeys), 
get(clusteringColumns), get(valueColumns), settings.generate.order);
         }
 
         List<Generator> get(List<ColumnInfo> columnInfos)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java 
b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 3c9e2a6..a6dfaf4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -24,9 +24,10 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 
-import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.commons.cli.*;
 
+import org.apache.cassandra.stress.settings.StressSettings;
+
 public class StressServer
 {
     private static final Options availableOptions = new Options();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
deleted file mode 100644
index 66f8c1d..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Partition.java
+++ /dev/null
@@ -1,554 +0,0 @@
-package org.apache.cassandra.stress.generate;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.generate.values.Generator;
-
-// a partition is re-used to reduce garbage generation, as is its internal 
RowIterator
-// TODO: we should batch the generation of clustering components so we can 
bound the time and size necessary to
-// generate huge partitions with only a small number of clustering components; 
i.e. we should generate seeds for batches
-// of a single component, and then generate the values within those batches as 
necessary. this will be difficult with
-// generating sorted partitions, and may require generator support (e.g. we 
may need to support generating prefixes
-// that are extended/suffixed to generate each batch, so that we can sort the 
prefixes)
-public class Partition
-{
-
-    private long idseed;
-    private Seed seed;
-    private final Object[] partitionKey;
-    private final PartitionGenerator generator;
-    private final RowIterator iterator;
-
-    public Partition(PartitionGenerator generator)
-    {
-        this.generator = generator;
-        this.partitionKey = new Object[generator.partitionKey.size()];
-        if (generator.clusteringComponents.size() > 0)
-            iterator = new MultiRowIterator();
-        else
-            iterator = new SingleRowIterator();
-    }
-
-    void setSeed(Seed seed)
-    {
-        long idseed = 0;
-        for (int i = 0 ; i < partitionKey.length ; i++)
-        {
-            Generator generator = this.generator.partitionKey.get(i);
-            // set the partition key seed based on the current work item we're 
processing
-            generator.setSeed(seed.seed);
-            Object key = generator.generate();
-            partitionKey[i] = key;
-            // then contribute this value to the data seed
-            idseed = seed(key, generator.type, idseed);
-        }
-        this.seed = seed;
-        this.idseed = idseed;
-    }
-
-    public RowIterator iterator(double useChance, boolean isWrite)
-    {
-        iterator.reset(useChance, 0, 1, isWrite);
-        return iterator;
-    }
-
-    public RowIterator iterator(int targetCount, boolean isWrite)
-    {
-        iterator.reset(Double.NaN, targetCount, 1, isWrite);
-        return iterator;
-    }
-
-    class SingleRowIterator extends RowIterator
-    {
-        boolean done;
-
-        void reset(double useChance, int targetCount, int batches, boolean 
isWrite)
-        {
-            done = false;
-        }
-
-        public Iterable<Row> next()
-        {
-            if (done)
-                return Collections.emptyList();
-            for (int i = 0 ; i < row.row.length ; i++)
-            {
-                Generator gen = generator.valueComponents.get(i);
-                gen.setSeed(idseed);
-                row.row[i] = gen.generate();
-            }
-            done = true;
-            return Collections.singleton(row);
-        }
-
-        public boolean done()
-        {
-            return done;
-        }
-
-        public void markWriteFinished()
-        {
-            assert done;
-            generator.seeds.markFinished(seed);
-        }
-    }
-
-    public abstract class RowIterator
-    {
-        // we reuse the row object to save garbage
-        final Row row = new Row(partitionKey, new 
Object[generator.clusteringComponents.size() + 
generator.valueComponents.size()]);
-
-        public abstract Iterable<Row> next();
-        public abstract boolean done();
-        public abstract void markWriteFinished();
-        abstract void reset(double useChance, int targetCount, int batches, 
boolean isWrite);
-
-        public Partition partition()
-        {
-            return Partition.this;
-        }
-    }
-
-    // permits iterating a random subset of the procedurally generated rows in 
this partition. this is the only mechanism for visiting rows.
-    // we maintain a stack of clustering components and their seeds; for each 
clustering component we visit, we generate all values it takes at that level,
-    // and then, using the average (total) number of children it takes we 
randomly choose whether or not we visit its children;
-    // if we do, we generate all possible values the immediate children can 
take, and repeat the process. So at any one time we are using space proportional
-    // to C.N, where N is the average number of values each clustering 
component takes, as opposed to N^C total values in the partition.
-    // TODO : guarantee at least one row is always returned
-    // TODO : support first/last row, and constraining reads to rows we know 
are populated
-    class MultiRowIterator extends RowIterator
-    {
-
-        // probability any single row will be generated in this iteration
-        double useChance;
-
-        // the seed used to generate the current values for the clustering 
components at each depth;
-        // used to save recalculating it for each row, so we only need to 
recalc from prior row.
-        final long[] clusteringSeeds = new 
long[generator.clusteringComponents.size()];
-        // the components remaining to be visited for each level of the 
current stack
-        final Deque<Object>[] clusteringComponents = new 
ArrayDeque[generator.clusteringComponents.size()];
-
-        // we want our chance of selection to be applied uniformly, so we 
compound the roll we make at each level
-        // so that we know with what chance we reached there, and we adjust 
our roll at that level by that amount
-        final double[] chancemodifier = new 
double[generator.clusteringComponents.size()];
-        final double[] rollmodifier = new 
double[generator.clusteringComponents.size()];
-
-        // track where in the partition we are, and where we are limited to
-        final int[] position = new int[generator.clusteringComponents.size()];
-        final int[] limit = new int[position.length];
-        int batchSize;
-        boolean returnedOne;
-        boolean forceReturnOne;
-
-        // reusable collections for generating unique and sorted clustering 
components
-        final Set<Object> unique = new HashSet<>();
-        final List<Comparable> tosort = new ArrayList<>();
-        final Random random = new Random();
-
-        MultiRowIterator()
-        {
-            for (int i = 0 ; i < clusteringComponents.length ; i++)
-                clusteringComponents[i] = new ArrayDeque<>();
-            rollmodifier[0] = 1f;
-            chancemodifier[0] = generator.clusteringChildAverages[0];
-        }
-
-        // if we're a write, the expected behaviour is that the requested 
batch count is compounded with the seed's visit
-        // count to decide how much we should return in one iteration
-        void reset(double useChance, int targetCount, int batches, boolean 
isWrite)
-        {
-            if (this.useChance < 1d)
-            {
-                // we clear our prior roll-modifiers if the use chance was 
previously less-than zero
-                Arrays.fill(rollmodifier, 1d);
-                Arrays.fill(chancemodifier, 1d);
-            }
-
-            // set the seed for the first clustering component
-            generator.clusteringComponents.get(0).setSeed(idseed);
-            int[] position = seed.position;
-
-            // calculate how many first clustering components we'll generate, 
and how many total rows this predicts
-            int firstComponentCount = (int) 
generator.clusteringComponents.get(0).clusteringDistribution.next();
-            int expectedRowCount;
-
-            if (!isWrite && position != null)
-            {
-                expectedRowCount = 0;
-                for (int i = 0 ; i < position.length ; i++)
-                {
-                    expectedRowCount += position[i] * 
generator.clusteringChildAverages[i];
-                    limit[i] = position[i];
-                }
-            }
-            else
-            {
-                expectedRowCount = firstComponentCount * 
generator.clusteringChildAverages[0];
-                if (isWrite)
-                    batches *= seed.visits;
-                Arrays.fill(limit, Integer.MAX_VALUE);
-            }
-
-            batchSize = Math.max(1, expectedRowCount / batches);
-            if (Double.isNaN(useChance))
-                useChance = Math.max(0d, Math.min(1d, targetCount / (double) 
expectedRowCount));
-
-            // clear any remnants of the last iteration, wire up our 
constants, and fill in the first clustering components
-            this.useChance = useChance;
-            this.returnedOne = false;
-            for (Queue<?> q : clusteringComponents)
-                q.clear();
-            clusteringSeeds[0] = idseed;
-            fill(clusteringComponents[0], firstComponentCount, 
generator.clusteringComponents.get(0));
-
-            // seek to our start position
-            seek(isWrite ? position : null);
-        }
-
-        // generate the clustering components for the provided depth; requires 
preceding components
-        // to have been generated and their seeds populated into 
clusteringSeeds
-        void fill(int depth)
-        {
-            long seed = clusteringSeeds[depth - 1];
-            Generator gen = generator.clusteringComponents.get(depth);
-            gen.setSeed(seed);
-            clusteringSeeds[depth] = seed(clusteringComponents[depth - 
1].peek(), generator.clusteringComponents.get(depth - 1).type, seed);
-            fill(clusteringComponents[depth], (int) 
gen.clusteringDistribution.next(), gen);
-        }
-
-        // generate the clustering components into the queue
-        void fill(Queue<Object> queue, int count, Generator generator)
-        {
-            if (count == 1)
-            {
-                queue.add(generator.generate());
-                return;
-            }
-
-            switch (Partition.this.generator.order)
-            {
-                case SORTED:
-                    if (Comparable.class.isAssignableFrom(generator.clazz))
-                    {
-                        tosort.clear();
-                        for (int i = 0 ; i < count ; i++)
-                            tosort.add((Comparable) generator.generate());
-                        Collections.sort(tosort);
-                        for (int i = 0 ; i < count ; i++)
-                            queue.add(tosort.get(i));
-                        break;
-                    }
-                    else
-                    {
-                        throw new RuntimeException("Generator class is not 
comparable: "+generator.clazz);
-                    }
-                case ARBITRARY:
-                    unique.clear();
-                    for (int i = 0 ; i < count ; i++)
-                    {
-                        Object next = generator.generate();
-                        if (unique.add(next))
-                            queue.add(next);
-                    }
-                    break;
-                case SHUFFLED:
-                    unique.clear();
-                    tosort.clear();
-                    for (int i = 0 ; i < count ; i++)
-                    {
-                        Object next = generator.generate();
-                        if (unique.add(next))
-                            tosort.add(new RandomOrder(next));
-                    }
-                    Collections.sort(tosort);
-                    for (Object o : tosort)
-                        queue.add(((RandomOrder)o).value);
-                    break;
-                default:
-                    throw new IllegalStateException();
-            }
-        }
-
-        // seek to the provided position (or the first entry if null)
-        private void seek(int[] position)
-        {
-            if (position == null)
-            {
-                this.position[0] = -1;
-                clusteringComponents[0].addFirst(this);
-                advance(0);
-                return;
-            }
-
-            assert position.length == clusteringComponents.length;
-            for (int i = 0 ; i < position.length ; i++)
-            {
-                if (i != 0)
-                    fill(i);
-                for (int c = position[i] ; c > 0 ; c--)
-                    clusteringComponents[i].poll();
-                row.row[i] = clusteringComponents[i].peek();
-            }
-            System.arraycopy(position, 0, this.position, 0, position.length);
-        }
-
-        // normal method for moving the iterator forward; maintains the row 
object, and delegates to advance(int)
-        // to move the iterator to the next item
-        void advance()
-        {
-            // we are always at the leaf level when this method is invoked
-            // so we calculate the seed for generating the row by combining 
the seed that generated the clustering components
-            int depth = clusteringComponents.length - 1;
-            long parentSeed = clusteringSeeds[depth];
-            long rowSeed = seed(clusteringComponents[depth].peek(), 
generator.clusteringComponents.get(depth).type, parentSeed);
-
-            // and then fill the row with the _non-clustering_ values for the 
position we _were_ at, as this is what we'll deliver
-            for (int i = clusteringSeeds.length ; i < row.row.length ; i++)
-            {
-                Generator gen = generator.valueComponents.get(i - 
clusteringSeeds.length);
-                gen.setSeed(rowSeed);
-                row.row[i] = gen.generate();
-            }
-            returnedOne = true;
-            forceReturnOne = false;
-
-            // then we advance the leaf level
-            advance(depth);
-        }
-
-        private void advance(int depth)
-        {
-            // advance the leaf component
-            clusteringComponents[depth].poll();
-            position[depth]++;
-            while (true)
-            {
-                if (clusteringComponents[depth].isEmpty())
-                {
-                    // if we've run out of clustering components at this 
level, ascend
-                    if (depth == 0)
-                        return;
-                    depth--;
-                    clusteringComponents[depth].poll();
-                    position[depth]++;
-                    continue;
-                }
-
-                if (depth == 0 && !returnedOne && 
clusteringComponents[0].size() == 1)
-                    forceReturnOne = true;
-
-                // the chance of descending is the uniform usechance, 
multiplied by the number of children
-                // we would on average generate (so if we have a 0.1 use 
chance, but should generate 10 children
-                // then we will always descend), multiplied by 1/(compound 
roll), where (compound roll) is the
-                // chance with which we reached this depth, i.e. if we already 
beat 50/50 odds, we double our
-                // chance of beating this next roll
-                double thischance = useChance * chancemodifier[depth];
-                if (forceReturnOne || thischance > 0.999f || thischance >= 
random.nextDouble())
-                {
-                    // if we're descending, we fill in our clustering 
component and increase our depth
-                    row.row[depth] = clusteringComponents[depth].peek();
-                    depth++;
-                    if (depth == clusteringComponents.length)
-                        break;
-                    // if we haven't reached the leaf, we update our 
probability statistics, fill in all of
-                    // this level's clustering components, and repeat
-                    if (useChance < 1d)
-                    {
-                        rollmodifier[depth] = rollmodifier[depth - 1] / 
Math.min(1d, thischance);
-                        chancemodifier[depth] = 
generator.clusteringChildAverages[depth] * rollmodifier[depth];
-                    }
-                    position[depth] = 0;
-                    fill(depth);
-                    continue;
-                }
-
-                // if we don't descend, we remove the clustering suffix we've 
skipped and continue
-                clusteringComponents[depth].poll();
-                position[depth]++;
-            }
-        }
-
-        public Iterable<Row> next()
-        {
-            final int[] limit = position.clone();
-            int remainingSize = batchSize;
-            for (int i = 0 ; i < limit.length && remainingSize > 0 ; i++)
-            {
-                limit[i] += remainingSize / 
generator.clusteringChildAverages[i];
-                remainingSize %= generator.clusteringChildAverages[i];
-            }
-            assert remainingSize == 0;
-            for (int i = limit.length - 1 ; i > 0 ; i--)
-            {
-                if (limit[i] > generator.clusteringChildAverages[i])
-                {
-                    limit[i - 1] += limit[i] / 
generator.clusteringChildAverages[i];
-                    limit[i] %= generator.clusteringChildAverages[i];
-                }
-            }
-            for (int i = 0 ; i < limit.length ; i++)
-            {
-                if (limit[i] < this.limit[i])
-                    break;
-                limit[i] = Math.min(limit[i], this.limit[i]);
-            }
-            return new Iterable<Row>()
-            {
-                public Iterator<Row> iterator()
-                {
-                    return new Iterator<Row>()
-                    {
-
-                        public boolean hasNext()
-                        {
-                            if (done())
-                                return false;
-                            for (int i = 0 ; i < position.length ; i++)
-                                if (position[i] < limit[i])
-                                    return true;
-                            return false;
-                        }
-
-                        public Row next()
-                        {
-                            advance();
-                            return row;
-                        }
-
-                        public void remove()
-                        {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            };
-        }
-
-        public boolean done()
-        {
-            return clusteringComponents[0].isEmpty();
-        }
-
-        public void markWriteFinished()
-        {
-            if (done())
-                generator.seeds.markFinished(seed);
-            else
-                generator.seeds.markVisited(seed, position.clone());
-        }
-
-        public Partition partition()
-        {
-            return Partition.this;
-        }
-    }
-
-    private static class RandomOrder implements Comparable<RandomOrder>
-    {
-        final int order = ThreadLocalRandom.current().nextInt();
-        final Object value;
-        private RandomOrder(Object value)
-        {
-            this.value = value;
-        }
-
-        public int compareTo(RandomOrder that)
-        {
-            return Integer.compare(this.order, that.order);
-        }
-    }
-
-    // calculate a new seed based on the combination of a parent seed and the 
generated child, to generate
-    // any children of this child
-    static long seed(Object object, AbstractType type, long seed)
-    {
-        if (object instanceof ByteBuffer)
-        {
-            ByteBuffer buf = (ByteBuffer) object;
-            for (int i = buf.position() ; i < buf.limit() ; i++)
-                seed = (31 * seed) + buf.get(i);
-            return seed;
-        }
-        else if (object instanceof String)
-        {
-            String str = (String) object;
-            for (int i = 0 ; i < str.length() ; i++)
-                seed = (31 * seed) + str.charAt(i);
-            return seed;
-        }
-        else if (object instanceof Number)
-        {
-            return (seed * 31) + ((Number) object).longValue();
-        }
-        else if (object instanceof UUID)
-        {
-            return seed * 31 + (((UUID) object).getLeastSignificantBits() ^ 
((UUID) object).getMostSignificantBits());
-        }
-        else
-        {
-            return seed(type.decompose(object), BytesType.instance, seed);
-        }
-    }
-
-    public Object getPartitionKey(int i)
-    {
-        return partitionKey[i];
-    }
-
-    public String getKeyAsString()
-    {
-        StringBuilder sb = new StringBuilder();
-        int i = 0;
-        for (Object key : partitionKey)
-        {
-            if (i > 0)
-                sb.append("|");
-            AbstractType type = generator.partitionKey.get(i++).type;
-            sb.append(type.getString(type.decompose(key)));
-        }
-        return sb.toString();
-    }
-
-    // used for thrift smart routing - if it's a multi-part key we don't try 
to route correctly right now
-    public ByteBuffer getToken()
-    {
-        return generator.partitionKey.get(0).type.decompose(partitionKey[0]);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index 128d2f5..9f88068 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.stress.generate;
 
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -30,7 +29,6 @@ import java.util.NoSuchElementException;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.values.Generator;
 
 public class PartitionGenerator
@@ -46,30 +44,24 @@ public class PartitionGenerator
     final List<Generator> partitionKey;
     final List<Generator> clusteringComponents;
     final List<Generator> valueComponents;
-    final int[] clusteringChildAverages;
+    final int[] clusteringDescendantAverages;
+    final int[] clusteringComponentAverages;
 
     private final Map<String, Integer> indexMap;
     final Order order;
-    final SeedManager seeds;
 
-    final List<Partition> recyclable = new ArrayList<>();
-    int partitionsInUse = 0;
-
-    public void reset()
-    {
-        partitionsInUse = 0;
-    }
-
-    public PartitionGenerator(List<Generator> partitionKey, List<Generator> 
clusteringComponents, List<Generator> valueComponents, Order order, SeedManager 
seeds)
+    public PartitionGenerator(List<Generator> partitionKey, List<Generator> 
clusteringComponents, List<Generator> valueComponents, Order order)
     {
         this.partitionKey = partitionKey;
         this.clusteringComponents = clusteringComponents;
         this.valueComponents = valueComponents;
         this.order = order;
-        this.seeds = seeds;
-        this.clusteringChildAverages = new int[clusteringComponents.size()];
-        for (int i = clusteringChildAverages.length - 1 ; i >= 0 ; i--)
-            clusteringChildAverages[i] = (int) (i < 
(clusteringChildAverages.length - 1) ? clusteringComponents.get(i + 
1).clusteringDistribution.average() * clusteringChildAverages[i + 1] : 1);
+        this.clusteringDescendantAverages = new 
int[clusteringComponents.size()];
+        this.clusteringComponentAverages = new 
int[clusteringComponents.size()];
+        for (int i = 0 ; i < clusteringComponentAverages.length ; i++)
+            clusteringComponentAverages[i] = (int) 
clusteringComponents.get(i).clusteringDistribution.average();
+        for (int i = clusteringDescendantAverages.length - 1 ; i >= 0 ; i--)
+            clusteringDescendantAverages[i] = (int) (i < 
(clusteringDescendantAverages.length - 1) ? clusteringComponentAverages[i + 1] 
* clusteringDescendantAverages[i + 1] : 1);
         double maxRowCount = 1d;
         double minRowCount = 1d;
         for (Generator component : clusteringComponents)
@@ -101,19 +93,6 @@ public class PartitionGenerator
         return i;
     }
 
-    public Partition generate(Operation op)
-    {
-        if (recyclable.size() <= partitionsInUse || 
recyclable.get(partitionsInUse) == null)
-            recyclable.add(new Partition(this));
-
-        Seed seed = seeds.next(op);
-        if (seed == null)
-            return null;
-        Partition partition = recyclable.get(partitionsInUse++);
-        partition.setSeed(seed);
-        return partition;
-    }
-
     public ByteBuffer convert(int c, Object v)
     {
         if (c < 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
index f427608..9e2e65b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
@@ -18,50 +18,68 @@
 */
 package org.apache.cassandra.stress.generate;
 
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.cassandra.stress.util.DynamicList;
 
 public class Seed implements Comparable<Seed>
 {
 
+    public final int visits;
     public final long seed;
-    final int visits;
 
-    DynamicList.Node poolNode;
-    volatile int[] position;
-    volatile State state = State.HELD;
+    private volatile DynamicList.Node poolNode;
+    private volatile int position;
 
-    private static final AtomicReferenceFieldUpdater<Seed, Seed.State> 
stateUpdater = AtomicReferenceFieldUpdater.newUpdater(Seed.class, State.class, 
"state");
+    private static final AtomicIntegerFieldUpdater<Seed> positionUpdater = 
AtomicIntegerFieldUpdater.newUpdater(Seed.class, "position");
 
     public int compareTo(Seed that)
     {
         return Long.compare(this.seed, that.seed);
     }
 
-    static enum State
-    {
-        HELD, AVAILABLE
-    }
-
     Seed(long seed, int visits)
     {
         this.seed = seed;
         this.visits = visits;
     }
 
-    boolean take()
+    public int position()
     {
-        return stateUpdater.compareAndSet(this, State.AVAILABLE, State.HELD);
+        return position;
     }
 
-    void yield()
+    public int moveForwards(int rowCount)
     {
-        state = State.AVAILABLE;
+        return positionUpdater.getAndAdd(this, rowCount);
     }
 
-    public int[] position()
+    public int hashCode()
     {
-        return position;
+        return (int) seed;
+    }
+
+    public boolean equals(Object that)
+    {
+        return that instanceof Seed && this.seed == ((Seed) that).seed;
+    }
+
+    public boolean save(DynamicList<Seed> sampleFrom, int maxSize)
+    {
+        DynamicList.Node poolNode = sampleFrom.append(this, maxSize);
+        if (poolNode == null)
+            return false;
+        this.poolNode = poolNode;
+        return true;
+    }
+
+    public boolean isSaved()
+    {
+        return poolNode != null;
+    }
+
+    public void remove(DynamicList<Seed> sampleFrom)
+    {
+        sampleFrom.remove(poolNode);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
index dba721d..071d888 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
@@ -33,9 +33,12 @@ public class SeedManager
     final Distribution visits;
     final Generator writes;
     final Generator reads;
-    final ConcurrentHashMap<Seed, Seed> managing = new ConcurrentHashMap<>();
+    final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>();
     final DynamicList<Seed> sampleFrom;
     final Distribution sample;
+    final long sampleOffset;
+    final int sampleSize;
+    final boolean updateSampleImmediately;
 
     public SeedManager(StressSettings settings)
     {
@@ -61,10 +64,15 @@ public class SeedManager
         this.visits = settings.insert.visits.get();
         this.writes = writes;
         this.reads = reads;
-        this.sample = 
DistributionInverted.invert(settings.insert.revisit.get());
-        if (sample.maxValue() > Integer.MAX_VALUE || sample.minValue() < 0)
-            throw new IllegalArgumentException();
-        this.sampleFrom = new DynamicList<>((int) sample.maxValue());
+        Distribution sample = settings.insert.revisit.get();
+        this.sampleOffset = Math.min(sample.minValue(), sample.maxValue());
+        long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - 
sampleOffset;
+        if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE)
+            throw new IllegalArgumentException("sample range is invalid");
+        this.sampleFrom = new DynamicList<>((int) sampleSize);
+        this.sample = DistributionInverted.invert(sample);
+        this.sampleSize = (int) sampleSize;
+        this.updateSampleImmediately = visits.average() > 1;
     }
 
     public Seed next(Operation op)
@@ -80,48 +88,38 @@ public class SeedManager
 
         while (true)
         {
-            int index = (int) sample.next();
+            int index = (int) (sample.next() - sampleOffset);
             Seed seed = sampleFrom.get(index);
-            if (seed != null && seed.take())
+            if (seed != null && seed.isSaved())
                 return seed;
 
             seed = writes.next((int) visits.next());
             if (seed == null)
                 return null;
-            // seeds are created HELD, so if we insert it successfully we have 
it exclusively for our write
-            if (managing.putIfAbsent(seed, seed) == null)
-                return seed;
+            if (managing.putIfAbsent(seed.seed, seed) == null)
+            {
+                if (!updateSampleImmediately || seed.save(sampleFrom, 
sampleSize))
+                    return seed;
+                managing.remove(seed.seed, seed);
+            }
         }
     }
 
-    public void markVisited(Seed seed, int[] position)
-    {
-        boolean first = seed.position == null;
-        seed.position = position;
-        finishedWriting(seed, first, false);
-    }
-
-    public void markFinished(Seed seed)
+    public void markLastWrite(Seed seed, boolean first)
     {
-        finishedWriting(seed, seed.position == null, true);
+        // we could have multiple iterators mark the last write simultaneously,
+        // so we ensure we remove conditionally, and only remove the exact 
seed we were operating over
+        // this is important because, to ensure correctness, we do not support 
calling remove multiple
+        // times on the same DynamicList.Node
+        if (managing.remove(seed.seed, seed) && !first)
+            seed.remove(sampleFrom);
     }
 
-    void finishedWriting(Seed seed, boolean first, boolean completed)
+    public void markFirstWrite(Seed seed, boolean last)
     {
-        if (!completed)
-        {
-            if (first)
-                seed.poolNode = sampleFrom.append(seed);
-            seed.yield();
-        }
-        else
-        {
-            if (!first)
-                sampleFrom.remove(seed.poolNode);
-            managing.remove(seed);
-        }
-        if (first)
-            writes.finishWrite(seed);
+        if (!last && !updateSampleImmediately)
+            seed.save(sampleFrom, Integer.MAX_VALUE);
+        writes.finishWrite(seed);
     }
 
     private abstract class Generator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
index 358163c..3c15c87 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
@@ -20,12 +20,11 @@
  */
 package org.apache.cassandra.stress.generate.values;
 
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.generate.FasterRandom;
-
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Random;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.stress.generate.FasterRandom;
 
 public class Bytes extends Generator<ByteBuffer>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
index 8f7b2ea..3522338 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/GeneratorConfig.java
@@ -20,17 +20,14 @@
  */
 package org.apache.cassandra.stress.generate.values;
 
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
 import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MurmurHash;
 
-
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
 public class GeneratorConfig implements Serializable
 {
     public final long salt;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
index 71aaae6..b58fee2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
@@ -20,8 +20,6 @@
  */
 package org.apache.cassandra.stress.generate.values;
 
-import java.util.Random;
-
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.stress.generate.FasterRandom;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
index efe4b79..7bfabf5 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/TimeUUIDs.java
@@ -21,11 +21,11 @@
 package org.apache.cassandra.stress.generate.values;
 
 
+import java.util.UUID;
+
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.utils.UUIDGen;
 
-import java.util.UUID;
-
 public class TimeUUIDs extends Generator<UUID>
 {
     final Dates dateGen;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 3212795..533b630 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -25,7 +25,6 @@ import org.apache.cassandra.stress.Operation;
 
 public class FixedOpDistribution implements OpDistribution
 {
-
     final Operation operation;
 
     public FixedOpDistribution(Operation operation)
@@ -37,10 +36,4 @@ public class FixedOpDistribution implements OpDistribution
     {
         return operation;
     }
-
-    public int maxBatchSize()
-    {
-        return (int) operation.partitionCount.maxValue();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index bcbd0bf..0fc15a6 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -27,6 +27,5 @@ public interface OpDistribution
 {
 
     Operation next();
-    public int maxBatchSize();
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index 0bd64c5..432e991 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.stress.operations;
 
 
 import org.apache.commons.math3.distribution.EnumeratedDistribution;
-import org.apache.commons.math3.util.Pair;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.Distribution;
@@ -41,14 +40,6 @@ public class SampledOpDistribution implements OpDistribution
         this.clustering = clustering;
     }
 
-    public int maxBatchSize()
-    {
-        int max = 1;
-        for (Pair<Operation, Double> pair : operations.getPmf())
-            max = Math.max(max, (int) 
pair.getFirst().partitionCount.maxValue());
-        return max;
-    }
-
     public Operation next()
     {
         while (remaining == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index b7d1ee7..456c821 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.Timer;
@@ -36,9 +37,9 @@ public class CqlCounterAdder extends CqlOperation<Integer>
 {
 
     final Distribution counteradd;
-    public CqlCounterAdder(DistributionFactory counteradd, Timer timer, 
PartitionGenerator generator, StressSettings settings)
+    public CqlCounterAdder(DistributionFactory counteradd, Timer timer, 
PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
     {
-        super(Command.COUNTER_WRITE, timer, generator, settings);
+        super(Command.COUNTER_WRITE, timer, generator, seedManager, settings);
         this.counteradd = counteradd.get();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
index 94c8faf..8c1c65c 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.Timer;
@@ -33,9 +34,9 @@ import org.apache.cassandra.stress.util.Timer;
 public class CqlCounterGetter extends CqlOperation<Integer>
 {
 
-    public CqlCounterGetter(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public CqlCounterGetter(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
     {
-        super(Command.COUNTER_READ, timer, generator, settings);
+        super(Command.COUNTER_READ, timer, generator, seedManager, settings);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index 622eb14..88ee752 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -26,17 +26,17 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.utils.UUIDGen;
 
 public class CqlInserter extends CqlOperation<Integer>
 {
 
-    public CqlInserter(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public CqlInserter(Timer timer, PartitionGenerator generator, SeedManager 
seedManager, StressSettings settings)
     {
-        super(Command.WRITE, timer, generator, settings);
+        super(Command.WRITE, timer, generator, seedManager, settings);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index 0264cd1..9cea854 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -24,13 +24,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.base.Function;
+
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
-import com.google.common.base.Function;
-import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.StressMetrics;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.ConnectionStyle;
 import org.apache.cassandra.stress.settings.CqlVersion;
@@ -54,9 +54,9 @@ public abstract class CqlOperation<V> extends 
PredefinedOperation
     protected abstract String buildQuery();
     protected abstract CqlRunOp<V> buildRunOp(ClientWrapper client, String 
query, Object queryId, List<Object> params, ByteBuffer key);
 
-    public CqlOperation(Command type, Timer timer, PartitionGenerator 
generator, StressSettings settings)
+    public CqlOperation(Command type, Timer timer, PartitionGenerator 
generator, SeedManager seedManager, StressSettings settings)
     {
-        super(type, timer, generator, settings);
+        super(type, timer, generator, seedManager, settings);
         if (settings.columns.variableColumnCount)
             throw new IllegalStateException("Variable column counts are not 
implemented for CQL");
     }
@@ -168,28 +168,6 @@ public abstract class CqlOperation<V> extends 
PredefinedOperation
         }
     }
 
-    // Requires a custom validate() method, but fetches and stores the keys 
from the result set for further processing
-    protected abstract class CqlRunOpFetchKeys extends CqlRunOp<byte[][]>
-    {
-
-        protected CqlRunOpFetchKeys(ClientWrapper client, String query, Object 
queryId, List<Object> params, ByteBuffer key)
-        {
-            super(client, query, queryId, KeysHandler.INSTANCE, params, key);
-        }
-
-        @Override
-        public int partitionCount()
-        {
-            return result.length;
-        }
-
-        @Override
-        public int rowCount()
-        {
-            return result.length;
-        }
-    }
-
     protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
     {
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
index 3a7f75a..12cc241 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -22,23 +22,22 @@ package org.apache.cassandra.stress.operations.predefined;
 
 
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CqlReader extends CqlOperation<ByteBuffer[][]>
 {
 
-    public CqlReader(Timer timer, PartitionGenerator generator, StressSettings 
settings)
+    public CqlReader(Timer timer, PartitionGenerator generator, SeedManager 
seedManager, StressSettings settings)
     {
-        super(Command.READ, timer, generator, settings);
+        super(Command.READ, timer, generator, seedManager, settings);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index dba2e51..d5c3edc 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -20,26 +20,17 @@ package org.apache.cassandra.stress.operations.predefined;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.stress.Operation;
-import org.apache.cassandra.stress.StressMetrics;
-import org.apache.cassandra.stress.generate.Distribution;
-import org.apache.cassandra.stress.generate.DistributionFactory;
-import org.apache.cassandra.stress.generate.DistributionFixed;
-import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.CqlVersion;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class PredefinedOperation extends Operation
 {
@@ -47,13 +38,18 @@ public abstract class PredefinedOperation extends Operation
     private final Distribution columnCount;
     private Object cqlCache;
 
-    public PredefinedOperation(Command type, Timer timer, PartitionGenerator 
generator, StressSettings settings)
+    public PredefinedOperation(Command type, Timer timer, PartitionGenerator 
generator, SeedManager seedManager, StressSettings settings)
     {
-        super(timer, generator, settings, new DistributionFixed(1));
+        super(timer, settings, spec(generator, seedManager));
         this.type = type;
         this.columnCount = settings.columns.countDistribution.get();
     }
 
+    private static DataSpec spec(PartitionGenerator generator, SeedManager 
seedManager)
+    {
+        return new DataSpec(generator, seedManager, new DistributionFixed(1), 
1);
+    }
+
     public boolean isCql3()
     {
         return settings.mode.cqlVersion == CqlVersion.CQL3;
@@ -174,7 +170,7 @@ public abstract class PredefinedOperation extends Operation
 
     protected List<ByteBuffer> getColumnValues(ColumnSelection columns)
     {
-        Row row = partitions.get(0).iterator(1, 
false).next().iterator().next();
+        Row row = partitions.get(0).next();
         ByteBuffer[] r = new ByteBuffer[columns.count()];
         int c = 0;
         if (columns.indices != null)
@@ -186,7 +182,7 @@ public abstract class PredefinedOperation extends Operation
         return Arrays.asList(r);
     }
 
-    public static Operation operation(Command type, Timer timer, 
PartitionGenerator generator, StressSettings settings, DistributionFactory 
counteradd)
+    public static Operation operation(Command type, Timer timer, 
PartitionGenerator generator, SeedManager seedManager, StressSettings settings, 
DistributionFactory counteradd)
     {
         switch (type)
         {
@@ -194,10 +190,10 @@ public abstract class PredefinedOperation extends 
Operation
                 switch(settings.mode.style)
                 {
                     case THRIFT:
-                        return new ThriftReader(timer, generator, settings);
+                        return new ThriftReader(timer, generator, seedManager, 
settings);
                     case CQL:
                     case CQL_PREPARED:
-                        return new CqlReader(timer, generator, settings);
+                        return new CqlReader(timer, generator, seedManager, 
settings);
                     default:
                         throw new UnsupportedOperationException();
                 }
@@ -207,10 +203,10 @@ public abstract class PredefinedOperation extends 
Operation
                 switch(settings.mode.style)
                 {
                     case THRIFT:
-                        return new ThriftCounterGetter(timer, generator, 
settings);
+                        return new ThriftCounterGetter(timer, generator, 
seedManager, settings);
                     case CQL:
                     case CQL_PREPARED:
-                        return new CqlCounterGetter(timer, generator, 
settings);
+                        return new CqlCounterGetter(timer, generator, 
seedManager, settings);
                     default:
                         throw new UnsupportedOperationException();
                 }
@@ -220,10 +216,10 @@ public abstract class PredefinedOperation extends 
Operation
                 switch(settings.mode.style)
                 {
                     case THRIFT:
-                        return new ThriftInserter(timer, generator, settings);
+                        return new ThriftInserter(timer, generator, 
seedManager, settings);
                     case CQL:
                     case CQL_PREPARED:
-                        return new CqlInserter(timer, generator, settings);
+                        return new CqlInserter(timer, generator, seedManager, 
settings);
                     default:
                         throw new UnsupportedOperationException();
                 }
@@ -232,10 +228,10 @@ public abstract class PredefinedOperation extends 
Operation
                 switch(settings.mode.style)
                 {
                     case THRIFT:
-                        return new ThriftCounterAdder(counteradd, timer, 
generator, settings);
+                        return new ThriftCounterAdder(counteradd, timer, 
generator, seedManager, settings);
                     case CQL:
                     case CQL_PREPARED:
-                        return new CqlCounterAdder(counteradd, timer, 
generator, settings);
+                        return new CqlCounterAdder(counteradd, timer, 
generator, seedManager, settings);
                     default:
                         throw new UnsupportedOperationException();
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
index 4ee42e9..be34a07 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -27,19 +27,22 @@ import java.util.Map;
 import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
 
 public class ThriftCounterAdder extends PredefinedOperation
 {
 
     final Distribution counteradd;
-    public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, 
PartitionGenerator generator, StressSettings settings)
+    public ThriftCounterAdder(DistributionFactory counteradd, Timer timer, 
PartitionGenerator generator, SeedManager seedManager, StressSettings settings)
     {
-        super(Command.COUNTER_WRITE, timer, generator, settings);
+        super(Command.COUNTER_WRITE, timer, generator, seedManager, settings);
         this.counteradd = counteradd.get();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
index 10c6aab..ca81fe9 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
@@ -31,9 +32,9 @@ import org.apache.cassandra.thrift.SlicePredicate;
 
 public class ThriftCounterGetter extends PredefinedOperation
 {
-    public ThriftCounterGetter(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public ThriftCounterGetter(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
     {
-        super(Command.COUNTER_READ, timer, generator, settings);
+        super(Command.COUNTER_READ, timer, generator, seedManager, settings);
     }
 
     public void run(final ThriftClient client) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
index d6adbf9..1827c06 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -24,22 +24,23 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
 
 public final class ThriftInserter extends PredefinedOperation
 {
 
-    public ThriftInserter(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public ThriftInserter(Timer timer, PartitionGenerator generator, 
SeedManager seedManager, StressSettings settings)
     {
-        super(Command.WRITE, timer, generator, settings);
+        super(Command.WRITE, timer, generator, seedManager, settings);
     }
 
     public boolean isWrite()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c579a01/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
index 276d8c5..d77dc6a 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -22,21 +22,20 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.SeedManager;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.SuperColumn;
 
 public final class ThriftReader extends PredefinedOperation
 {
 
-    public ThriftReader(Timer timer, PartitionGenerator generator, 
StressSettings settings)
+    public ThriftReader(Timer timer, PartitionGenerator generator, SeedManager 
seedManager, StressSettings settings)
     {
-        super(Command.READ, timer, generator, settings);
+        super(Command.READ, timer, generator, seedManager, settings);
     }
 
     public void run(final ThriftClient client) throws IOException

Reply via email to