Author: brandonwilliams Date: Wed Feb 23 19:25:40 2011 New Revision: 1073894
URL: http://svn.apache.org/viewvc?rev=1073894&view=rev Log: Switch stress.java to a producer/consumer model for better performance. Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2020 Removed: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/OperationThread.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/util/Range.java Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Wed Feb 23 19:25:40 2011 @@ -20,8 +20,8 @@ package org.apache.cassandra.contrib.str import java.io.*; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.atomic.AtomicIntegerArray; -import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.*; @@ -38,9 +38,9 @@ public class Session // command line options public static final Options availableOptions = new Options(); - public final AtomicIntegerArray operationCount; - public final AtomicIntegerArray keyCount; - public final AtomicLongArray latencies; + public final AtomicInteger operations; + public final AtomicInteger keys; + public final AtomicLong latency; static { @@ -93,7 +93,7 @@ public class Session private PrintStream out = System.out; private IndexType indexType = null; - private Stress.Operation operation = Stress.Operation.INSERT; + private Stress.Operations operation = Stress.Operations.INSERT; private ColumnFamilyType columnFamilyType = ColumnFamilyType.Standard; private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy"; @@ -183,7 +183,7 @@ public class Session unframed = Boolean.parseBoolean(cmd.getOptionValue("m")); if (cmd.hasOption("o")) - operation = Stress.Operation.valueOf(cmd.getOptionValue("o").toUpperCase()); + operation = Stress.Operations.valueOf(cmd.getOptionValue("o").toUpperCase()); if (cmd.hasOption("u")) superColumns = Integer.parseInt(cmd.getOptionValue("u")); @@ -248,9 +248,9 @@ public class Session mean = numKeys / 2; sigma = numKeys * STDev; - operationCount = new AtomicIntegerArray(threads); - keyCount = new AtomicIntegerArray(threads); - latencies = new AtomicLongArray(threads); + operations = new AtomicInteger(); + keys = new AtomicInteger(); + latency = new AtomicLong(); } public int getCardinality() @@ -323,7 +323,7 @@ public class Session return ignoreErrors; } - public Stress.Operation getOperation() + public Stress.Operations getOperation() { return operation; } Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Stress.java Wed Feb 23 19:25:40 2011 @@ -18,15 +18,18 @@ package org.apache.cassandra.contrib.stress; import org.apache.cassandra.contrib.stress.operations.*; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; +import org.apache.cassandra.thrift.Cassandra; import org.apache.commons.cli.Option; import java.io.PrintStream; import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.SynchronousQueue; public final class Stress { - public static enum Operation + public static enum Operations { INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET } @@ -34,9 +37,15 @@ public final class Stress public static Session session; public static Random randomizer = new Random(); + /** + * Producer-Consumer model: 1 producer, N consumers + */ + private static final BlockingQueue<Operation> operations = new SynchronousQueue<Operation>(true); + public static void main(String[] arguments) throws Exception { - int epoch, total, oldTotal, latency, keyCount, oldKeyCount, oldLatency; + long latency, oldLatency; + int epoch, total, oldTotal, keyCount, oldKeyCount; try { @@ -49,51 +58,52 @@ public final class Stress } // creating keyspace and column families - if (session.getOperation() == Stress.Operation.INSERT) + if (session.getOperation() == Stress.Operations.INSERT) { session.createKeySpaces(); } int threadCount = session.getThreads(); - Thread[] threads = new Thread[threadCount]; - PrintStream out = session.getOutputStream(); + Thread[] consumers = new Thread[threadCount]; + PrintStream out = session.getOutputStream(); + + out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); + + int itemsPerThread = session.getKeysPerThread(); + int modulo = session.getNumKeys() % threadCount; // creating required type of the threads for the test - try - { - for (int i = 0; i < threadCount; i++) - { - threads[i] = createOperation(i); - } - } - catch (Exception e) + for (int i = 0; i < threadCount; i++) { - System.err.println(e.getMessage()); - return; + if (i == threadCount - 1) + itemsPerThread += modulo; // last one is going to handle N + modulo items + + consumers[i] = new Consumer(itemsPerThread); } + new Producer().start(); + // starting worker threads for (int i = 0; i < threadCount; i++) { - threads[i].start(); + consumers[i].start(); } // initialization of the values boolean terminate = false; - epoch = total = latency = keyCount = 0; + latency = 0; + epoch = total = keyCount = 0; int interval = session.getProgressInterval(); int epochIntervals = session.getProgressInterval() * 10; long testStartTime = System.currentTimeMillis(); - out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); - while (!terminate) { Thread.sleep(100); int alive = 0; - for (Thread thread : threads) + for (Thread thread : consumers) if (thread.isAlive()) alive++; if (alive == 0) @@ -109,20 +119,9 @@ public final class Stress oldLatency = latency; oldKeyCount = keyCount; - int currentTotal = 0, currentKeyCount = 0, currentLatency = 0; - - for (Thread t : threads) - { - OperationThread thread = (OperationThread) t; - - currentTotal += session.operationCount.get(thread.index); - currentKeyCount += session.keyCount.get(thread.index); - currentLatency += session.latencies.get(thread.index); - } - - total = currentTotal; - keyCount = currentKeyCount; - latency = currentLatency; + total = session.operations.get(); + keyCount = session.keys.get(); + latency = session.latency.get(); int opDelta = total - oldTotal; int keyDelta = keyCount - oldKeyCount; @@ -136,7 +135,7 @@ public final class Stress } } - private static Thread createOperation(int index) + private static Operation createOperation(int index) { switch (session.getOperation()) { @@ -174,4 +173,58 @@ public final class Stress option.getLongOpt(), (option.hasArg()) ? "="+upperCaseName : "", option.getDescription())); } } + + /** + * Produces exactly N items (awaits each to be consumed) + */ + private static class Producer extends Thread + { + public void run() + { + for (int i = 0; i < session.getNumKeys(); i++) + { + try + { + operations.put(createOperation(i)); + } + catch (InterruptedException e) + { + System.err.println("Producer error - " + e.getMessage()); + return; + } + } + } + } + + /** + * Each consumes exactly N items from queue + */ + private static class Consumer extends Thread + { + private final int items; + + public Consumer(int toConsume) + { + items = toConsume; + } + + public void run() + { + Cassandra.Client client = session.getClient(); + + for (int i = 0; i < items; i++) + { + try + { + operations.take().run(client); // running job + } + catch (Exception e) + { + System.err.println(e.getMessage()); + System.exit(-1); + } + } + } + } + } Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Wed Feb 23 19:25:40 2011 @@ -17,23 +17,23 @@ */ package org.apache.cassandra.contrib.stress.operations; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; -public class IndexedRangeSlicer extends OperationThread +public class IndexedRangeSlicer extends Operation { public IndexedRangeSlicer(int index) { super(index); } - public void run() + public void run(Cassandra.Client client) throws IOException { String format = "%0" + session.getTotalKeysLength() + "d"; SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}), @@ -46,64 +46,59 @@ public class IndexedRangeSlicer extends ByteBuffer columnName = ByteBuffer.wrap("C1".getBytes()); - for (int i = range.begins(); i < range.size(); i++) - { - int received = 0; + int received = 0; - String startOffset = "0"; - ByteBuffer value = ByteBuffer.wrap(values.get(i % values.size()).getBytes()); + String startOffset = "0"; + ByteBuffer value = ByteBuffer.wrap(values.get(index % values.size()).getBytes()); - IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value); + IndexExpression expression = new IndexExpression(columnName, IndexOperator.EQ, value); - while (received < expectedPerValue) - { - IndexClause clause = new IndexClause(Arrays.asList(expression), ByteBuffer.wrap(startOffset.getBytes()), - session.getKeysPerCall()); + while (received < expectedPerValue) + { + IndexClause clause = new IndexClause(Arrays.asList(expression), + ByteBuffer.wrap(startOffset.getBytes()), + session.getKeysPerCall()); - List<KeySlice> results = null; - long start = System.currentTimeMillis(); + List<KeySlice> results = null; + long start = System.currentTimeMillis(); - boolean success = false; - String exceptionMessage = null; + boolean success = false; + String exceptionMessage = null; - for (int t = 0; t < session.getRetryTimes(); t++) - { - if (success) - break; + for (int t = 0; t < session.getRetryTimes(); t++) + { + if (success) + break; - try - { - results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel()); - success = (results.size() != 0); - } - catch (Exception e) - { - exceptionMessage = getExceptionMessage(e); - success = false; - } + try + { + results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel()); + success = (results.size() != 0); } - - if (!success) + catch (Exception e) { - System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n", - index, - session.getRetryTimes(), - startOffset, - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - return; + exceptionMessage = getExceptionMessage(e); + success = false; } + } + + if (!success) + { + error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n", + index, + session.getRetryTimes(), + startOffset, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); + } - received += results.size(); + received += results.size(); - // convert max key found back to an integer, and increment it - startOffset = String.format(format, (1 + getMaxKey(results))); + // convert max key found back to an integer, and increment it + startOffset = String.format(format, (1 + getMaxKey(results))); - session.operationCount.getAndIncrement(index); - session.keyCount.getAndAdd(index, results.size()); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); - } + session.operations.getAndIncrement(); + session.keys.getAndAdd(results.size()); + session.latency.getAndAdd(System.currentTimeMillis() - start); } } Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Wed Feb 23 19:25:40 2011 @@ -17,26 +17,27 @@ */ package org.apache.cassandra.contrib.stress.operations; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -public class Inserter extends OperationThread +public class Inserter extends Operation { + public Inserter(int index) { super(index); } - public void run() + public void run(Cassandra.Client client) throws IOException { List<String> values = generateValues(); List<Column> columns = new ArrayList<Column>(); @@ -48,8 +49,10 @@ public class Inserter extends OperationT // columns = [Column('C' + str(j), 'unset', time.time() * 1000000) for j in xrange(columns_per_key)] for (int i = 0; i < session.getColumnsPerKey(); i++) { - byte[] columnName = ("C" + Integer.toString(i)).getBytes(); - columns.add(new Column(ByteBuffer.wrap(columnName), ByteBuffer.wrap(new byte[] {}), System.currentTimeMillis())); + String columnName = ("C" + Integer.toString(i)); + String columnValue = values.get(index % values.size()); + + columns.add(new Column(ByteBufferUtil.bytes(columnName), ByteBufferUtil.bytes(columnValue), System.currentTimeMillis())); } if (session.getColumnFamilyType() == ColumnFamilyType.Super) @@ -62,58 +65,47 @@ public class Inserter extends OperationT } } - for (int i : range) - { - String rawKey = String.format(format, i); - ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes()); - Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - - record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super - ? getSuperColumnsMutationMap(superColumns) - : getColumnsMutationMap(columns)); + String rawKey = String.format(format, index); + Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); - String value = values.get(i % values.size()); + record.put(ByteBufferUtil.bytes(rawKey), session.getColumnFamilyType() == ColumnFamilyType.Super + ? getSuperColumnsMutationMap(superColumns) + : getColumnsMutationMap(columns)); - for (Column c : columns) - c.value = ByteBuffer.wrap(value.getBytes()); + long start = System.currentTimeMillis(); - long start = System.currentTimeMillis(); + boolean success = false; + String exceptionMessage = null; - boolean success = false; - String exceptionMessage = null; + for (int t = 0; t < session.getRetryTimes(); t++) + { + if (success) + break; - for (int t = 0; t < session.getRetryTimes(); t++) + try { - if (success) - break; - - try - { - client.batch_mutate(record, session.getConsistencyLevel()); - success = true; - } - catch (Exception e) - { - exceptionMessage = getExceptionMessage(e); - success = false; - } + client.batch_mutate(record, session.getConsistencyLevel()); + success = true; } - - if (!success) + catch (Exception e) { - System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index, - session.getRetryTimes(), - rawKey, - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - break; + exceptionMessage = getExceptionMessage(e); + success = false; } + } - session.operationCount.getAndIncrement(index); - session.keyCount.getAndIncrement(index); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); + if (!success) + { + error(String.format("Operation [%d] retried %d times - error inserting key %s %s%n", + index, + session.getRetryTimes(), + rawKey, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); } + + session.operations.getAndIncrement(); + session.keys.getAndIncrement(); + session.latency.getAndAdd(System.currentTimeMillis() - start); } private Map<String, List<Mutation>> getSuperColumnsMutationMap(List<SuperColumn> superColumns) Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Wed Feb 23 19:25:40 2011 @@ -17,90 +17,39 @@ */ package org.apache.cassandra.contrib.stress.operations; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -public class MultiGetter extends OperationThread +public class MultiGetter extends Operation { public MultiGetter(int index) { super(index); } - public void run() + public void run(Cassandra.Client client) throws IOException { SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[]{}), ByteBuffer.wrap(new byte[] {}), false, session.getColumnsPerKey())); int offset = index * session.getKeysPerThread(); - Map<ByteBuffer,List<ColumnOrSuperColumn>> results = null; - int count = (((index + 1) * session.getKeysPerThread()) - offset) / session.getKeysPerCall(); + Map<ByteBuffer,List<ColumnOrSuperColumn>> results; if (session.getColumnFamilyType() == ColumnFamilyType.Super) { - for (int i = 0; i < count; i++) - { - List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall()); - - for (int j = 0; j < session.getSuperColumns(); j++) - { - ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes()); - - long start = System.currentTimeMillis(); - - boolean success = false; - String exceptionMessage = null; - - for (int t = 0; t < session.getRetryTimes(); t++) - { - if (success) - break; - - try - { - results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); - success = (results.size() != 0); - } - catch (Exception e) - { - exceptionMessage = getExceptionMessage(e); - } - } - - if (!success) - { - System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", - index, - session.getRetryTimes(), - keys, - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - return; - } - - session.operationCount.getAndIncrement(index); - session.keyCount.getAndAdd(index, keys.size()); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); - - offset += session.getKeysPerCall(); - } - } - } - else - { - ColumnParent parent = new ColumnParent("Standard1"); + List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall()); - for (int i = 0; i < count; i++) + for (int j = 0; j < session.getSuperColumns(); j++) { - List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall()); + ColumnParent parent = new ColumnParent("Super1").setSuper_column(("S" + j).getBytes()); long start = System.currentTimeMillis(); @@ -120,29 +69,68 @@ public class MultiGetter extends Operati catch (Exception e) { exceptionMessage = getExceptionMessage(e); - success = false; } } if (!success) { - System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", - index, - session.getRetryTimes(), - keys, - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - return; + error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", + index, + session.getRetryTimes(), + keys, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); } - session.operationCount.getAndIncrement(index); - session.keyCount.getAndAdd(index, keys.size()); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); + session.operations.getAndIncrement(); + session.keys.getAndAdd(keys.size()); + session.latency.getAndAdd(System.currentTimeMillis() - start); offset += session.getKeysPerCall(); } } + else + { + ColumnParent parent = new ColumnParent("Standard1"); + + List<ByteBuffer> keys = generateKeys(offset, offset + session.getKeysPerCall()); + + long start = System.currentTimeMillis(); + + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) + { + if (success) + break; + + try + { + results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); + success = (results.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; + } + } + + if (!success) + { + error(String.format("Operation [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", + index, + session.getRetryTimes(), + keys, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); + } + + session.operations.getAndIncrement(); + session.keys.getAndAdd(keys.size()); + session.latency.getAndAdd(System.currentTimeMillis() - start); + + offset += session.getKeysPerCall(); + } } private List<ByteBuffer> generateKeys(int start, int limit) Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Wed Feb 23 19:25:40 2011 @@ -17,15 +17,17 @@ */ package org.apache.cassandra.contrib.stress.operations; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; +import org.apache.cassandra.utils.ByteBufferUtil; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -public class RangeSlicer extends OperationThread +public class RangeSlicer extends Operation { public RangeSlicer(int index) @@ -33,87 +35,29 @@ public class RangeSlicer extends Operati super(index); } - public void run() + public void run(Cassandra.Client client) throws IOException { String format = "%0" + session.getTotalKeysLength() + "d"; // initial values - int current = range.begins(); - int limit = range.limit(); - int count = session.getColumnsPerKey(); - int last = current + session.getKeysPerCall(); + int count = session.getColumnsPerKey(); SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[] {}), ByteBuffer.wrap(new byte[] {}), - false, count)); + false, + count)); if (session.getColumnFamilyType() == ColumnFamilyType.Super) { - while (current < limit) - { - byte[] start = String.format(format, current).getBytes(); - byte[] end = String.format(format, last).getBytes(); - - List<KeySlice> slices = new ArrayList<KeySlice>(); - KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end); - - for (int i = 0; i < session.getSuperColumns(); i++) - { - String superColumnName = "S" + Integer.toString(i); - ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes())); - - long startTime = System.currentTimeMillis(); - - boolean success = false; - String exceptionMessage = null; - - for (int t = 0; t < session.getRetryTimes(); t++) - { - try - { - slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); - success = (slices.size() != 0); - } - catch (Exception e) - { - exceptionMessage = getExceptionMessage(e); - success = false; - } - } - - if (!success) - { - System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n", - index, - session.getRetryTimes(), - new String(start), - new String(end), - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); + byte[] start = String.format(format, index).getBytes(); - if (!session.ignoreErrors()) - return; - } - - session.operationCount.getAndIncrement(index); - session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime); - } + List<KeySlice> slices = new ArrayList<KeySlice>(); + KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER); - current += slices.size() + 1; - last = current + slices.size() + 1; - session.keyCount.getAndAdd(index, slices.size()); - } - } - else - { - ColumnParent parent = new ColumnParent("Standard1"); - - while (current < limit) + for (int i = 0; i < session.getSuperColumns(); i++) { - byte[] start = String.format(format, current).getBytes(); - byte[] end = String.format(format, last).getBytes(); - - List<KeySlice> slices = new ArrayList<KeySlice>(); - KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(end); + String superColumnName = "S" + Integer.toString(i); + ColumnParent parent = new ColumnParent("Super1").setSuper_column(ByteBuffer.wrap(superColumnName.getBytes())); long startTime = System.currentTimeMillis(); @@ -122,9 +66,6 @@ public class RangeSlicer extends Operati for (int t = 0; t < session.getRetryTimes(); t++) { - if (success) - break; - try { slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); @@ -139,24 +80,62 @@ public class RangeSlicer extends Operati if (!success) { - System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n", - index, - session.getRetryTimes(), - new String(start), - new String(end), - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - return; + error(String.format("Operation [%d] retried %d times - error on calling get_range_slices for range offset %s %s%n", + index, + session.getRetryTimes(), + new String(start), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); } - current += slices.size() + 1; - last = current + slices.size() + 1; + session.operations.getAndIncrement(); + session.latency.getAndAdd(System.currentTimeMillis() - startTime); + } + + session.keys.getAndAdd(slices.size()); + } + else + { + ColumnParent parent = new ColumnParent("Standard1"); + + byte[] start = String.format(format, index).getBytes(); + + List<KeySlice> slices = new ArrayList<KeySlice>(); + KeyRange range = new KeyRange(count).setStart_key(start).setEnd_key(ByteBufferUtil.EMPTY_BYTE_BUFFER); + + long startTime = System.currentTimeMillis(); + + boolean success = false; + String exceptionMessage = null; - session.operationCount.getAndIncrement(index); - session.keyCount.getAndAdd(index, slices.size()); - session.latencies.getAndAdd(index, System.currentTimeMillis() - startTime); + for (int t = 0; t < session.getRetryTimes(); t++) + { + if (success) + break; + + try + { + slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); + success = (slices.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; + } } + + if (!success) + { + error(String.format("Operation [%d] retried %d times - error on calling get_indexed_slices for range offset %s %s%n", + index, + session.getRetryTimes(), + new String(start), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); + } + + session.operations.getAndIncrement(); + session.keys.getAndAdd(slices.size()); + session.latency.getAndAdd(System.currentTimeMillis() - startTime); } } } Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1073894&r1=1073893&r2=1073894&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Wed Feb 23 19:25:40 2011 @@ -17,25 +17,22 @@ */ package org.apache.cassandra.contrib.stress.operations; -import org.apache.cassandra.contrib.stress.util.OperationThread; +import org.apache.cassandra.contrib.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; -import org.apache.cassandra.utils.ByteBufferUtil; import java.io.IOException; -import java.lang.AssertionError; import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; import java.util.List; -public class Reader extends OperationThread +public class Reader extends Operation { public Reader(int index) { super(index); } - public void run() + public void run(Cassandra.Client client) throws IOException { SliceRange sliceRange = new SliceRange(); @@ -50,75 +47,23 @@ public class Reader extends OperationThr if (session.getColumnFamilyType() == ColumnFamilyType.Super) { - runSuperColumnReader(predicate); + runSuperColumnReader(predicate, client); } else { - runColumnReader(predicate); + runColumnReader(predicate, client); } } - private void runSuperColumnReader(SlicePredicate predicate) + private void runSuperColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException { - for (int i = 0; i < session.getKeysPerThread(); i++) - { - byte[] rawKey = generateKey(); - ByteBuffer key = ByteBuffer.wrap(rawKey); - - for (int j = 0; j < session.getSuperColumns(); j++) - { - String superColumn = 'S' + Integer.toString(j); - ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes()); - - long start = System.currentTimeMillis(); - - boolean success = false; - String exceptionMessage = null; - - for (int t = 0; t < session.getRetryTimes(); t++) - { - if (success) - break; - - try - { - List<ColumnOrSuperColumn> columns; - columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel()); - success = (columns.size() != 0); - } - catch (Exception e) - { - exceptionMessage = getExceptionMessage(e); - success = false; - } - } - - if (!success) - { - System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index, - session.getRetryTimes(), - new String(rawKey), - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); - - if (!session.ignoreErrors()) - return; - } - - session.operationCount.getAndIncrement(index); - session.keyCount.getAndIncrement(index); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); - } - } - } - - private void runColumnReader(SlicePredicate predicate) - { - ColumnParent parent = new ColumnParent("Standard1"); + byte[] rawKey = generateKey(); + ByteBuffer key = ByteBuffer.wrap(rawKey); - for (int i = 0; i < session.getKeysPerThread(); i++) + for (int j = 0; j < session.getSuperColumns(); j++) { - byte[] key = generateKey(); - ByteBuffer keyBuffer = ByteBuffer.wrap(key); + String superColumn = 'S' + Integer.toString(j); + ColumnParent parent = new ColumnParent("Super1").setSuper_column(superColumn.getBytes()); long start = System.currentTimeMillis(); @@ -133,7 +78,7 @@ public class Reader extends OperationThr try { List<ColumnOrSuperColumn> columns; - columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel()); + columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel()); success = (columns.size() != 0); } catch (Exception e) @@ -145,18 +90,61 @@ public class Reader extends OperationThr if (!success) { - System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index, - session.getRetryTimes(), - new String(key), - (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); + error(String.format("Operation [%d] retried %d times - error reading key %s %s%n", + index, + session.getRetryTimes(), + new String(rawKey), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); + } + + session.operations.getAndIncrement(); + session.keys.getAndIncrement(); + session.latency.getAndAdd(System.currentTimeMillis() - start); + } + } + + private void runColumnReader(SlicePredicate predicate, Cassandra.Client client) throws IOException + { + ColumnParent parent = new ColumnParent("Standard1"); + + byte[] key = generateKey(); + ByteBuffer keyBuffer = ByteBuffer.wrap(key); + + long start = System.currentTimeMillis(); + + boolean success = false; + String exceptionMessage = null; - if (!session.ignoreErrors()) - return; + for (int t = 0; t < session.getRetryTimes(); t++) + { + if (success) + break; + + try + { + List<ColumnOrSuperColumn> columns; + columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel()); + success = (columns.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } + } - session.operationCount.getAndIncrement(index); - session.keyCount.getAndIncrement(index); - session.latencies.getAndAdd(index, System.currentTimeMillis() - start); + if (!success) + { + error(String.format("Operation [%d] retried %d times - error reading key %s %s%n", + index, + session.getRetryTimes(), + new String(key), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); } + + session.operations.getAndIncrement(); + session.keys.getAndIncrement(); + session.latency.getAndAdd(System.currentTimeMillis() - start); } + }
