Updated Branches:
  refs/heads/trunk f66b9eb27 -> 10b617364

Improvements and fixes to cassandra/stress
Patch by Benedict; reviewed by Pavel Yaskevich for CASSANDRA-6691


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/10b61736
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/10b61736
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/10b61736

Branch: refs/heads/trunk
Commit: 10b617364e9f639358f82c70056e31533a1ec11c
Parents: f66b9eb
Author: belliottsmith <git...@sub.laerad.com>
Authored: Wed Feb 12 11:36:35 2014 +0000
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Wed Feb 12 17:25:07 2014 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/stress/Operation.java  |  74 ++++++++++---
 .../apache/cassandra/stress/StressAction.java   |  13 ++-
 .../cassandra/stress/generatedata/DataGen.java  |   6 +-
 .../stress/generatedata/DataGenBytesRandom.java |   2 +-
 .../stress/generatedata/DataGenHex.java         |   2 +-
 .../generatedata/DataGenStringDictionary.java   |   6 +-
 .../generatedata/DataGenStringRepeats.java      |  16 +--
 .../cassandra/stress/generatedata/KeyGen.java   |   2 +-
 .../cassandra/stress/generatedata/RowGen.java   |   4 +-
 .../operations/CqlIndexedRangeSlicer.java       |   2 +-
 .../stress/operations/CqlInserter.java          |   2 +-
 .../stress/operations/CqlOperation.java         | 111 +++++++++++++++++++
 .../cassandra/stress/operations/CqlReader.java  |   8 +-
 .../stress/operations/ThriftCounterAdder.java   |   2 +-
 .../operations/ThriftIndexedRangeSlicer.java    |   2 +-
 .../stress/operations/ThriftInserter.java       |   6 +-
 .../stress/operations/ThriftReader.java         |  30 ++++-
 .../cassandra/stress/settings/SettingsKey.java  |   7 +-
 18 files changed, 244 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java 
b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index fa7a453..4519b19 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -21,10 +21,25 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.List;
 
 import org.apache.cassandra.stress.generatedata.KeyGen;
 import org.apache.cassandra.stress.generatedata.RowGen;
+import org.apache.cassandra.stress.operations.CqlCounterAdder;
+import org.apache.cassandra.stress.operations.CqlCounterGetter;
+import org.apache.cassandra.stress.operations.CqlIndexedRangeSlicer;
+import org.apache.cassandra.stress.operations.CqlInserter;
+import org.apache.cassandra.stress.operations.CqlMultiGetter;
+import org.apache.cassandra.stress.operations.CqlRangeSlicer;
+import org.apache.cassandra.stress.operations.CqlReader;
+import org.apache.cassandra.stress.operations.ThriftCounterAdder;
+import org.apache.cassandra.stress.operations.ThriftCounterGetter;
+import org.apache.cassandra.stress.operations.ThriftIndexedRangeSlicer;
+import org.apache.cassandra.stress.operations.ThriftInserter;
+import org.apache.cassandra.stress.operations.ThriftMultiGetter;
+import org.apache.cassandra.stress.operations.ThriftRangeSlicer;
+import org.apache.cassandra.stress.operations.ThriftReader;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.CqlVersion;
 import org.apache.cassandra.stress.settings.SettingsCommandMixed;
@@ -66,7 +81,8 @@ public abstract class Operation
         public final RowGen rowGen;
         public final List<ColumnParent> columnParents;
         public final StressMetrics metrics;
-        public final SettingsCommandMixed.CommandSelector readWriteSelector;
+        public final SettingsCommandMixed.CommandSelector commandSelector;
+        private final EnumMap<Command, State> substates;
         private Object cqlCache;
 
         public State(Command type, StressSettings settings, StressMetrics 
metrics)
@@ -74,9 +90,15 @@ public abstract class Operation
             this.type = type;
             this.timer = metrics.getTiming().newTimer();
             if (type == Command.MIXED)
-                readWriteSelector = ((SettingsCommandMixed) 
settings.command).selector();
+            {
+                commandSelector = ((SettingsCommandMixed) 
settings.command).selector();
+                substates = new EnumMap<>(Command.class);
+            }
             else
-                readWriteSelector = null;
+            {
+                commandSelector = null;
+                substates = null;
+            }
             this.settings = settings;
             this.keyGen = settings.keys.newKeyGen();
             this.rowGen = settings.columns.newRowGen();
@@ -91,6 +113,20 @@ public abstract class Operation
                 columnParents = Arrays.asList(cp);
             }
         }
+
+        private State(Command type, State copy)
+        {
+            this.type = type;
+            this.timer = copy.timer;
+            this.rowGen = copy.rowGen;
+            this.keyGen = copy.keyGen;
+            this.columnParents = copy.columnParents;
+            this.metrics = copy.metrics;
+            this.settings = copy.settings;
+            this.substates = null;
+            this.commandSelector = null;
+        }
+
         public boolean isCql3()
         {
             return settings.mode.cqlVersion == CqlVersion.CQL3;
@@ -107,6 +143,18 @@ public abstract class Operation
         {
             cqlCache = val;
         }
+
+        public State substate(Command command)
+        {
+            assert type == Command.MIXED;
+            State substate = substates.get(command);
+            if (substate == null)
+            {
+                substates.put(command, substate = new State(command, this));
+            }
+            return substate;
+        }
+
     }
 
     protected ByteBuffer getKey()
@@ -119,9 +167,9 @@ public abstract class Operation
         return state.keyGen.getKeys(count, index);
     }
 
-    protected List<ByteBuffer> generateColumnValues()
+    protected List<ByteBuffer> generateColumnValues(ByteBuffer key)
     {
-        return state.rowGen.generate(index);
+        return state.rowGen.generate(index, key);
     }
 
     /**
@@ -146,20 +194,18 @@ public abstract class Operation
         boolean success = false;
         String exceptionMessage = null;
 
-        for (int t = 0; t < state.settings.command.tries; t++)
+        int tries = 0;
+        for (; tries < state.settings.command.tries; tries++)
         {
-            if (success)
-                break;
-
             try
             {
                 success = run.run();
+                break;
             }
             catch (Exception e)
             {
                 System.err.println(e);
                 exceptionMessage = getExceptionMessage(e);
-                success = false;
             }
         }
 
@@ -167,11 +213,13 @@ public abstract class Operation
 
         if (!success)
         {
-            error(String.format("Operation [%d] retried %d times - error 
executing for key %s %s%n",
+            error(String.format("Operation [%d] x%d key %s %s%n",
                     index,
-                    state.settings.command.tries,
+                    tries,
                     run.key(),
-                    (exceptionMessage == null) ? "" : "(" + exceptionMessage + 
")"));
+                    (exceptionMessage == null)
+                        ? "Data returned was not validated"
+                        : "Error executing: " + exceptionMessage));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java 
b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 0312093..94824ec 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -93,6 +93,10 @@ public class StressAction implements Runnable
             default:
                 throw new IllegalStateException();
         }
+
+        // we need to warm up all the nodes in the cluster ideally, but we may 
not be the only stress instance;
+        // so warm up all the nodes we're speaking to only.
+        iterations *= settings.node.nodes.size();
         output.println(String.format("Warming up %s with %d iterations...", 
type, iterations));
         run(type, 20, iterations, warmupOutput);
     }
@@ -151,9 +155,7 @@ public class StressAction implements Runnable
 
     private boolean hasAverageImprovement(List<StressMetrics> results, int 
count, double minImprovement)
     {
-        if (results.size() < count + 1)
-            return true;
-        return averageImprovement(results, count) >= minImprovement;
+        return results.size() < count + 1 || averageImprovement(results, 
count) >= minImprovement;
     }
 
     private double averageImprovement(List<StressMetrics> results, int count)
@@ -385,7 +387,7 @@ public class StressAction implements Runnable
             int batchSize = (int) (operations / (1 << 19));
             if (batchSize < 20)
                 batchSize = 20;
-            ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<Work>(
+            ArrayBlockingQueue<Work> work = new ArrayBlockingQueue<>(
                     (int) ((operations / batchSize)
                   + (operations % batchSize == 0 ? 0 : 1))
             );
@@ -533,7 +535,8 @@ public class StressAction implements Runnable
                 }
 
             case MIXED:
-                return createOperation(state.readWriteSelector.next(), state, 
index);
+                Command subcommand = state.commandSelector.next();
+                return createOperation(subcommand, state.substate(subcommand), 
index);
 
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
index 4c22005..c441b7e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGen.java
@@ -6,13 +6,13 @@ import java.util.List;
 public abstract class DataGen
 {
 
-    public abstract void generate(ByteBuffer fill, long offset);
+    public abstract void generate(ByteBuffer fill, long index, ByteBuffer 
seed);
     public abstract boolean isDeterministic();
 
-    public void generate(List<ByteBuffer> fills, long offset)
+    public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed)
     {
         for (ByteBuffer fill : fills)
-            generate(fill, offset++);
+            generate(fill, index++, seed);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
index 3906f93..cce438d 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenBytesRandom.java
@@ -9,7 +9,7 @@ public class DataGenBytesRandom extends DataGen
     private final Random rnd = new Random();
 
     @Override
-    public void generate(ByteBuffer fill, long offset)
+    public void generate(ByteBuffer fill, long index, ByteBuffer seed)
     {
         fill.clear();
         rnd.nextBytes(fill.array());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
index 50d49dd..b71d3e9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenHex.java
@@ -8,7 +8,7 @@ public abstract class DataGenHex extends DataGen
     abstract long next(long operationIndex);
 
     @Override
-    public final void generate(ByteBuffer fill, long operationIndex)
+    public final void generate(ByteBuffer fill, long operationIndex, 
ByteBuffer seed)
     {
         fill.clear();
         fillKeyStringBytes(next(operationIndex), fill.array());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
index e581232..7733ed6 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringDictionary.java
@@ -25,13 +25,13 @@ public class DataGenStringDictionary extends DataGen
     }
 
     @Override
-    public void generate(ByteBuffer fill, long index)
+    public void generate(ByteBuffer fill, long index, ByteBuffer seed)
     {
         fill(fill, 0);
     }
 
     @Override
-    public void generate(List<ByteBuffer> fills, long index)
+    public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed)
     {
         for (int i = 0 ; i < fills.size() ; i++)
             fill(fills.get(0), i);
@@ -55,7 +55,7 @@ public class DataGenStringDictionary extends DataGen
     @Override
     public boolean isDeterministic()
     {
-        return true;
+        return false;
     }
 
     public static DataGenFactory getFactory(File file) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
index 47091f7..4c5bb89 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/DataGenStringRepeats.java
@@ -25,32 +25,32 @@ public class DataGenStringRepeats extends DataGen
     }
 
     @Override
-    public void generate(ByteBuffer fill, long index)
+    public void generate(ByteBuffer fill, long index, ByteBuffer seed)
     {
-        fill(fill, index, 0);
+        fill(fill, index, 0, seed);
     }
 
     @Override
-    public void generate(List<ByteBuffer> fills, long index)
+    public void generate(List<ByteBuffer> fills, long index, ByteBuffer seed)
     {
         for (int i = 0 ; i < fills.size() ; i++)
         {
-            fill(fills.get(i), index, i);
+            fill(fills.get(i), index, i, seed);
         }
     }
 
-    private void fill(ByteBuffer fill, long index, int column)
+    private void fill(ByteBuffer fill, long index, int column, ByteBuffer seed)
     {
         fill.clear();
         byte[] trg = fill.array();
-        byte[] src = getData(index, column);
+        byte[] src = getData(index, column, seed);
         for (int j = 0 ; j < trg.length ; j += src.length)
             System.arraycopy(src, 0, trg, j, Math.min(src.length, trg.length - 
j));
     }
 
-    private byte[] getData(long index, int column)
+    private byte[] getData(long index, int column, ByteBuffer seed)
     {
-        final long key = (column * repeatFrequency) + (index % 
repeatFrequency);
+        final long key = (column * repeatFrequency) + ((seed == null ? index : 
Math.abs(seed.hashCode())) % repeatFrequency);
         byte[] r = cache.get(key);
         if (r != null)
             return r;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
index cdd6d39..36dc31d 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/KeyGen.java
@@ -21,7 +21,7 @@ public class KeyGen
     {
         while (keyBuffers.size() < n)
             keyBuffers.add(ByteBuffer.wrap(new byte[keySize]));
-        dataGen.generate(keyBuffers, index);
+        dataGen.generate(keyBuffers, index, null);
         return keyBuffers;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java 
b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
index 869fbc7..3174177 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generatedata/RowGen.java
@@ -16,10 +16,10 @@ public abstract class RowGen
         this.dataGen = dataGenerator;
     }
 
-    public List<ByteBuffer> generate(long operationIndex)
+    public List<ByteBuffer> generate(long operationIndex, ByteBuffer key)
     {
         List<ByteBuffer> fill = getColumns(operationIndex);
-        dataGen.generate(fill, operationIndex);
+        dataGen.generate(fill, operationIndex, key);
         return fill;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
index 748bf30..ff43322 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java
@@ -70,7 +70,7 @@ public class CqlIndexedRangeSlicer extends 
CqlOperation<byte[][]>
     protected void run(CqlOperation.ClientWrapper client) throws IOException
     {
         acceptNoResults = false;
-        final List<ByteBuffer> columns = generateColumnValues();
+        final List<ByteBuffer> columns = generateColumnValues(getKey());
         final ByteBuffer value = columns.get(1); // only C1 column is indexed
         byte[] minKey = new byte[0];
         int rowCount;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
index 82f00aa..8d964f5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java
@@ -72,7 +72,7 @@ public class CqlInserter extends CqlOperation<Integer>
     protected List<ByteBuffer> getQueryParameters(byte[] key)
     {
         final ArrayList<ByteBuffer> queryParams = new ArrayList<>();
-        final List<ByteBuffer> values = generateColumnValues();
+        final List<ByteBuffer> values = 
generateColumnValues(ByteBuffer.wrap(key));
         queryParams.addAll(values);
         queryParams.add(ByteBuffer.wrap(key));
         return queryParams;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
index 744e7f6..78dd461 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlOperation.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.List;
 
 import com.datastax.driver.core.PreparedStatement;
@@ -33,6 +34,7 @@ import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
+import org.apache.cassandra.thrift.CqlRow;
 import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.cassandra.transport.SimpleClient;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -167,6 +169,35 @@ public abstract class CqlOperation<V> extends Operation
 
     }
 
+    protected final class CqlRunOpMatchResults extends CqlRunOp<ByteBuffer[][]>
+    {
+
+        final List<List<ByteBuffer>> expect;
+
+        // a null value for an item in expect means we just check the row is 
present
+        protected CqlRunOpMatchResults(ClientWrapper client, String query, 
Object queryId, List<ByteBuffer> params, String id, ByteBuffer key, 
List<List<ByteBuffer>> expect)
+        {
+            super(client, query, queryId, RowsHandler.INSTANCE, params, id, 
key);
+            this.expect = expect;
+        }
+
+        @Override
+        public int keyCount()
+        {
+            return result == null ? 0 : result.length;
+        }
+
+        public boolean validate(ByteBuffer[][] result)
+        {
+            if (result.length != expect.size())
+                return false;
+            for (int i = 0 ; i < result.length ; i++)
+                if (!expect.get(i).equals(Arrays.asList(result[i])))
+                    return false;
+            return true;
+        }
+    }
+
     // Cql
     protected abstract class CqlRunOp<V> implements RunOp
     {
@@ -451,6 +482,86 @@ public abstract class CqlOperation<V> extends Operation
     }
 
     // Processes results from each client into an array of all key bytes 
returned
+    protected static final class RowsHandler implements 
ResultHandler<ByteBuffer[][]>
+    {
+        static final RowsHandler INSTANCE = new RowsHandler();
+
+        @Override
+        public Function<ResultSet, ByteBuffer[][]> javaDriverHandler()
+        {
+            return new Function<ResultSet, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(ResultSet result)
+                {
+                    if (result == null)
+                        return new ByteBuffer[0][];
+                    List<Row> rows = result.all();
+
+                    ByteBuffer[][] r = new ByteBuffer[rows.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        Row row = rows.get(i);
+                        r[i] = new 
ByteBuffer[row.getColumnDefinitions().size() - 1];
+                        for (int j = 1 ; j < row.getColumnDefinitions().size() 
; j++)
+                            r[i][j - 1] = row.getBytes(j);
+                    }
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<ResultMessage, ByteBuffer[][]> thriftHandler()
+        {
+            return new Function<ResultMessage, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(ResultMessage result)
+                {
+                    if (!(result instanceof ResultMessage.Rows))
+                        return new ByteBuffer[0][];
+
+                    ResultMessage.Rows rows = ((ResultMessage.Rows) result);
+                    ByteBuffer[][] r = new ByteBuffer[rows.result.size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        List<ByteBuffer> row = rows.result.rows.get(i);
+                        r[i] = new ByteBuffer[row.size()];
+                        for (int j = 0 ; j < row.size() ; j++)
+                            r[i][j] = row.get(j);
+                    }
+                    return r;
+                }
+            };
+        }
+
+        @Override
+        public Function<CqlResult, ByteBuffer[][]> simpleNativeHandler()
+        {
+            return new Function<CqlResult, ByteBuffer[][]>()
+            {
+
+                @Override
+                public ByteBuffer[][] apply(CqlResult result)
+                {
+                    ByteBuffer[][] r = new 
ByteBuffer[result.getRows().size()][];
+                    for (int i = 0 ; i < r.length ; i++)
+                    {
+                        CqlRow row = result.getRows().get(i);
+                        r[i] = new ByteBuffer[row.getColumns().size()];
+                        for (int j = 0 ; j < r[i].length ; j++)
+                            r[i][j] = 
ByteBuffer.wrap(row.getColumns().get(j).getValue());
+                    }
+                    return r;
+                }
+            };
+        }
+
+    }
+    // Processes results from each client into an array of all key bytes 
returned
     protected static final class KeysHandler implements ResultHandler<byte[][]>
     {
         static final KeysHandler INSTANCE = new KeysHandler();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
index 749a482..44da43f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java
@@ -23,10 +23,11 @@ package org.apache.cassandra.stress.operations;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class CqlReader extends CqlOperation<Integer>
+public class CqlReader extends CqlOperation<ByteBuffer[][]>
 {
 
     public CqlReader(State state, long idx)
@@ -79,9 +80,10 @@ public class CqlReader extends CqlOperation<Integer>
     }
 
     @Override
-    protected CqlRunOp<Integer> buildRunOp(ClientWrapper client, String query, 
Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
+    protected CqlRunOp<ByteBuffer[][]> buildRunOp(ClientWrapper client, String 
query, Object queryId, List<ByteBuffer> params, String keyid, ByteBuffer key)
     {
-        return new CqlRunOpTestNonEmpty(client, query, queryId, params, keyid, 
key);
+        List<ByteBuffer> expectRow = state.rowGen.isDeterministic() ? 
generateColumnValues(key) : null;
+        return new CqlRunOpMatchResults(client, query, queryId, params, keyid, 
key, Arrays.asList(expectRow));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
index b1657b2..26695a6 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftCounterAdder.java
@@ -39,7 +39,7 @@ public class ThriftCounterAdder extends Operation
 
     public void run(final ThriftClient client) throws IOException
     {
-        List<CounterColumn> columns = new ArrayList<CounterColumn>();
+        List<CounterColumn> columns = new ArrayList<>();
         for (int i = 0; i < state.settings.columns.maxColumnsPerKey; i++)
             columns.add(new CounterColumn(getColumnNameBytes(i), 1L));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
index c6b1b03..6eab209 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftIndexedRangeSlicer.java
@@ -49,7 +49,7 @@ public class ThriftIndexedRangeSlicer extends Operation
                 .setSlice_range(new 
SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
                         ByteBufferUtil.EMPTY_BYTE_BUFFER,
                         false, state.settings.columns.maxColumnsPerKey));
-        final List<ByteBuffer> columns = generateColumnValues();
+        final List<ByteBuffer> columns = generateColumnValues(getKey());
         final ColumnParent parent = state.columnParents.get(0);
 
         final ByteBuffer columnName = getColumnNameBytes(1);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
index c5f8051..b107f26 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftInserter.java
@@ -42,7 +42,7 @@ public final class ThriftInserter extends Operation
     public void run(final ThriftClient client) throws IOException
     {
         final ByteBuffer key = getKey();
-        final List<Column> columns = generateColumns();
+        final List<Column> columns = generateColumns(key);
 
         Map<String, List<Mutation>> row;
         if (!state.settings.columns.useSuperColumns)
@@ -92,9 +92,9 @@ public final class ThriftInserter extends Operation
         });
     }
 
-    protected List<Column> generateColumns()
+    protected List<Column> generateColumns(ByteBuffer key)
     {
-        final List<ByteBuffer> values = generateColumnValues();
+        final List<ByteBuffer> values = generateColumnValues(key);
         final List<Column> columns = new ArrayList<>(values.size());
 
         if (state.settings.columns.useTimeUUIDComparator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java 
b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
index a8605e8..c50843f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/ThriftReader.java
@@ -19,12 +19,16 @@ package org.apache.cassandra.stress.operations;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.util.ThriftClient;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
 
 public final class ThriftReader extends Operation
 {
@@ -48,6 +52,7 @@ public final class ThriftReader extends Operation
             predicate.setColumn_names(state.settings.columns.names);
 
         final ByteBuffer key = getKey();
+        final List<ByteBuffer> expect = state.rowGen.isDeterministic() ? 
generateColumnValues(key) : null;
         for (final ColumnParent parent : state.columnParents)
         {
             timeWithRetry(new RunOp()
@@ -55,7 +60,30 @@ public final class ThriftReader extends Operation
                 @Override
                 public boolean run() throws Exception
                 {
-                    return client.get_slice(key, parent, predicate, 
state.settings.command.consistencyLevel).size() != 0;
+                    List<ColumnOrSuperColumn> row = client.get_slice(key, 
parent, predicate, state.settings.command.consistencyLevel);
+                    if (expect == null)
+                        return !row.isEmpty();
+                    if (!state.settings.columns.useSuperColumns)
+                    {
+                        if (row.size() != expect.size())
+                            return false;
+                        for (int i = 0 ; i < row.size() ; i++)
+                            if 
(!row.get(i).getColumn().bufferForValue().equals(expect.get(i)))
+                                return false;
+                    }
+                    else
+                    {
+                        for (ColumnOrSuperColumn col : row)
+                        {
+                            SuperColumn superColumn = col.getSuper_column();
+                            if (superColumn.getColumns().size() != 
expect.size())
+                                return false;
+                            for (int i = 0 ; i < expect.size() ; i++)
+                                if 
(!superColumn.getColumns().get(i).bufferForValue().equals(expect.get(i)))
+                                    return false;
+                        }
+                    }
+                    return true;
                 }
 
                 @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/10b61736/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java 
b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
index 6cef0bf..e2647bb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsKey.java
@@ -73,9 +73,10 @@ public class SettingsKey implements Serializable
 
     public KeyGen newKeyGen()
     {
-        if (range != null)
-            return new KeyGen(new DataGenHexFromOpIndex(range[0], range[1]), 
keySize);
-        return new KeyGen(new DataGenHexFromDistribution(distribution.get()), 
keySize);
+        return new KeyGen(range == null
+                            ? new 
DataGenHexFromDistribution(distribution.get())
+                            : new DataGenHexFromOpIndex(range[0], range[1]),
+                          keySize);
     }
 
     // CLI Utility Methods

Reply via email to