Author: brandonwilliams Date: Thu Feb 17 22:13:40 2011 New Revision: 1071811
URL: http://svn.apache.org/viewvc?rev=1071811&view=rev Log: Add 'keep trying' behavior to stress.java. Patch by Pavel Yaskevich, reviewed by brandonwilliams for CASSANDRA-2047 Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/Session.java Thu Feb 17 22:13:40 2011 @@ -61,7 +61,8 @@ public class Session availableOptions.addOption("o", "operation", true, "Operation to perform (INSERT, READ, RANGE_SLICE, INDEXED_RANGE_SLICE, MULTI_GET), default:INSERT"); availableOptions.addOption("u", "supercolumns", true, "Number of super columns per key, default:1"); availableOptions.addOption("y", "family-type", true, "Column Family Type (Super, Standard), default:Standard"); - availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading, default:false"); + availableOptions.addOption("K", "keep-trying", true, "Retry on-going operation N times (in case of failure). positive integer, default:10"); + availableOptions.addOption("k", "keep-going", false, "Ignore errors inserting or reading (when set, --keep-trying has no effect), default:false"); availableOptions.addOption("i", "progress-interval", true, "Progress Report Interval (seconds), default:10"); availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000"); availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1"); @@ -80,13 +81,14 @@ public class Session private String[] nodes = new String[] { "127.0.0.1" }; private boolean random = false; private boolean unframed = false; - private boolean ignoreErrors = false; + private int retryTimes = 10; private int port = 9160; private int superColumns = 1; private int progressInterval = 10; private int keysPerCall = 1000; private int replicationFactor = 1; + private boolean ignoreErrors = false; private PrintStream out = System.out; @@ -97,6 +99,7 @@ public class Session private String replicationStrategy = "org.apache.cassandra.locator.SimpleStrategy"; private Map<String, String> replicationStrategyOptions = new HashMap<String, String>(); + // required by Gaussian distribution. protected int mean; protected float sigma; @@ -188,8 +191,21 @@ public class Session if (cmd.hasOption("y")) columnFamilyType = ColumnFamilyType.valueOf(cmd.getOptionValue("y")); + if (cmd.hasOption("K")) + { + retryTimes = Integer.valueOf(cmd.getOptionValue("K")); + + if (retryTimes <= 0) + { + throw new RuntimeException("--keep-trying option value should be > 0"); + } + } + if (cmd.hasOption("k")) + { + retryTimes = 1; ignoreErrors = true; + } if (cmd.hasOption("i")) progressInterval = Integer.parseInt(cmd.getOptionValue("i")); @@ -297,6 +313,11 @@ public class Session return consistencyLevel; } + public int getRetryTimes() + { + return retryTimes; + } + public boolean ignoreErrors() { return ignoreErrors; Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/IndexedRangeSlicer.java Thu Feb 17 22:13:40 2011 @@ -63,21 +63,33 @@ public class IndexedRangeSlicer extends List<KeySlice> results = null; long start = System.currentTimeMillis(); - try + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { - results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel()); + if (success) + break; - if (results.size() == 0) + try { - System.err.printf("No indexed values from offset received: %s%n", startOffset); - - if (!session.ignoreErrors()) - break; + results = client.get_indexed_slices(parent, clause, predicate, session.getConsistencyLevel()); + success = (results.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } } - catch (Exception e) + + if (!success) { - System.err.printf("Error on get_indexed_slices call for offset %s - %s%n", startOffset, getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for offset %s %s%n", + index, + session.getRetryTimes(), + startOffset, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) return; Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Inserter.java Thu Feb 17 22:13:40 2011 @@ -64,7 +64,8 @@ public class Inserter extends OperationT for (int i : range) { - ByteBuffer key = ByteBuffer.wrap(String.format(format, i).getBytes()); + String rawKey = String.format(format, i); + ByteBuffer key = ByteBuffer.wrap(rawKey.getBytes()); Map<ByteBuffer, Map<String, List<Mutation>>> record = new HashMap<ByteBuffer, Map<String, List<Mutation>>>(); record.put(key, session.getColumnFamilyType() == ColumnFamilyType.Super @@ -78,23 +79,35 @@ public class Inserter extends OperationT long start = System.currentTimeMillis(); - try - { - client.batch_mutate(record, session.getConsistencyLevel()); - } - catch (Exception e) + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { + if (success) + break; + try { - System.err.printf("Error while inserting key %s - %s%n", ByteBufferUtil.string(key), getExceptionMessage(e)); + client.batch_mutate(record, session.getConsistencyLevel()); + success = true; } - catch (CharacterCodingException e1) + catch (Exception e) { - throw new AssertionError(e1); // keys are valid strings + exceptionMessage = getExceptionMessage(e); + success = false; } + } + + if (!success) + { + System.err.printf("Thread [%d] retried %d times - error inserting key %s %s%n", index, + session.getRetryTimes(), + rawKey, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) - return; + break; } session.operationCount.getAndIncrement(index); Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/MultiGetter.java Thu Feb 17 22:13:40 2011 @@ -55,21 +55,32 @@ public class MultiGetter extends Operati long start = System.currentTimeMillis(); - try + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { - results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); + if (success) + break; - if (results.size() == 0) + try { - System.err.printf("Keys %s were not found.%n", keys); - - if (!session.ignoreErrors()) - break; + results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); + success = (results.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); } } - catch (Exception e) + + if (!success) { - System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", + index, + session.getRetryTimes(), + keys, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) return; @@ -93,21 +104,33 @@ public class MultiGetter extends Operati long start = System.currentTimeMillis(); - try + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { - results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); + if (success) + break; - if (results.size() == 0) + try { - System.err.printf("Keys %s were not found.%n", keys); - - if (!session.ignoreErrors()) - break; + results = client.multiget_slice(keys, parent, predicate, session.getConsistencyLevel()); + success = (results.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } } - catch (Exception e) + + if (!success) { - System.err.printf("Error on multiget_slice call - %s%n", getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error on calling multiget_slice for keys %s %s%n", + index, + session.getRetryTimes(), + keys, + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) return; Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/RangeSlicer.java Thu Feb 17 22:13:40 2011 @@ -64,21 +64,31 @@ public class RangeSlicer extends Operati long startTime = System.currentTimeMillis(); - try - { - slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); + boolean success = false; + String exceptionMessage = null; - if (slices.size() == 0) + for (int t = 0; t < session.getRetryTimes(); t++) + { + try { - System.err.printf("Range %s->%s not found in Super Column %s.%n", new String(start), new String(end), superColumnName); - - if (!session.ignoreErrors()) - break; + slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); + success = (slices.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } } - catch (Exception e) + + if (!success) { - System.err.printf("Error while reading Super Column %s - %s%n", superColumnName, getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error on calling get_range_slices for range %s->%s %s%n", + index, + session.getRetryTimes(), + new String(start), + new String(end), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) return; @@ -107,21 +117,34 @@ public class RangeSlicer extends Operati long startTime = System.currentTimeMillis(); - try + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { - slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); + if (success) + break; - if (slices.size() == 0) + try { - System.err.printf("Range %s->%s not found.%n", String.format(format, current), String.format(format, last)); - - if (!session.ignoreErrors()) - break; + slices = client.get_range_slices(parent, predicate, range, session.getConsistencyLevel()); + success = (slices.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } } - catch (Exception e) + + if (!success) { - System.err.printf("Error while reading range %s->%s - %s%n", String.format(format, current), String.format(format, last), getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error on calling get_indexed_slices for range %s->%s %s%n", + index, + session.getRetryTimes(), + new String(start), + new String(end), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) return; Modified: cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java?rev=1071811&r1=1071810&r2=1071811&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java (original) +++ cassandra/branches/cassandra-0.7/contrib/stress/src/org/apache/cassandra/contrib/stress/operations/Reader.java Thu Feb 17 22:13:40 2011 @@ -22,6 +22,7 @@ import org.apache.cassandra.db.ColumnFam import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; +import java.io.IOException; import java.lang.AssertionError; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; @@ -61,7 +62,8 @@ public class Reader extends OperationThr { for (int i = 0; i < session.getKeysPerThread(); i++) { - ByteBuffer key = ByteBuffer.wrap(generateKey()); + byte[] rawKey = generateKey(); + ByteBuffer key = ByteBuffer.wrap(rawKey); for (int j = 0; j < session.getSuperColumns(); j++) { @@ -70,32 +72,36 @@ public class Reader extends OperationThr long start = System.currentTimeMillis(); - try - { - List<ColumnOrSuperColumn> columns; - columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel()); + boolean success = false; + String exceptionMessage = null; - if (columns.size() == 0) - { - System.err.printf("Key %s not found in Super Column %s.%n", ByteBufferUtil.string(key), superColumn); - - if (!session.ignoreErrors()) - break; - } - } - catch (Exception e) + for (int t = 0; t < session.getRetryTimes(); t++) { + if (success) + break; + try { - System.err.printf("Error while reading Super Column %s key %s - %s%n", superColumn, ByteBufferUtil.string(key), getExceptionMessage(e)); + List<ColumnOrSuperColumn> columns; + columns = client.get_slice(key, parent, predicate, session.getConsistencyLevel()); + success = (columns.size() != 0); } - catch (CharacterCodingException e1) + catch (Exception e) { - throw new AssertionError(e1); // keys are valid string + exceptionMessage = getExceptionMessage(e); + success = false; } + } + + if (!success) + { + System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index, + session.getRetryTimes(), + new String(rawKey), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) - break; + return; } session.operationCount.getAndIncrement(index); @@ -116,25 +122,36 @@ public class Reader extends OperationThr long start = System.currentTimeMillis(); - try + boolean success = false; + String exceptionMessage = null; + + for (int t = 0; t < session.getRetryTimes(); t++) { - List<ColumnOrSuperColumn> columns; - columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel()); + if (success) + break; - if (columns.size() == 0) + try { - System.err.println(String.format("Key %s not found.", new String(key))); - - if (!session.ignoreErrors()) - break; + List<ColumnOrSuperColumn> columns; + columns = client.get_slice(keyBuffer, parent, predicate, session.getConsistencyLevel()); + success = (columns.size() != 0); + } + catch (Exception e) + { + exceptionMessage = getExceptionMessage(e); + success = false; } } - catch (Exception e) + + if (!success) { - System.err.printf("Error while reading key %s - %s%n", new String(key), getExceptionMessage(e)); + System.err.printf("Thread [%d] retried %d times - error reading key %s %s%n", index, + session.getRetryTimes(), + new String(key), + (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")"); if (!session.ignoreErrors()) - break; + return; } session.operationCount.getAndIncrement(index);
