Author: xedin
Date: Mon Aug 22 15:31:59 2011
New Revision: 1160305
URL: http://svn.apache.org/viewvc?rev=1160305&view=rev
Log:
Add query-by-column mode to stress.java
patch by Pavel Yaskevich; reviewed by Jonathan Ellis for CASSANDRA-3064
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Aug 22 15:31:59 2011
@@ -37,7 +37,7 @@
and few other places responsible for work with SSTable files
(CASSANDRA-3040)
* Stop reading from sstables once we know we have the most recent columns,
for query-by-name requests (CASSANDRA-2498)
-
+ * Add query-by-column mode to stress.java (CASSANDRA-3064)
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
Modified:
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
--- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
(original)
+++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java
Mon Aug 22 15:31:59 2011
@@ -20,10 +20,14 @@ package org.apache.cassandra.stress;
import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.commons.cli.*;
import org.apache.cassandra.db.ColumnFamilyType;
@@ -41,6 +45,9 @@ public class Session implements Serializ
// command line options
public static final Options availableOptions = new Options();
+ public static final String DEFAULT_COMPARATOR = "AsciiType";
+ public static final String DEFAULT_VALIDATOR = "BytesType";
+
public final AtomicInteger operations;
public final AtomicInteger keys;
public final AtomicLong latency;
@@ -78,6 +85,7 @@ public class Session implements Serializ
availableOptions.addOption("V", "average-size-values", false,
"Generate column values of average rather than specific size");
availableOptions.addOption("T", "send-to", true, "Send
this as a request to the stress daemon at specified address.");
availableOptions.addOption("I", "compression", false, "Use
sstable compression when creating schema");
+ availableOptions.addOption("Q", "query-names", true,
"Comma-separated list of column names to retrieve from each row.");
}
private int numKeys = 1000 * 1000;
@@ -109,6 +117,9 @@ public class Session implements Serializ
private String replicationStrategy =
"org.apache.cassandra.locator.SimpleStrategy";
private Map<String, String> replicationStrategyOptions = new
HashMap<String, String>();
+ // if we know exactly column names that we want to read (set by -Q option)
+ public final List<ByteBuffer> columnNames;
+
public final boolean averageSizeValues;
// required by Gaussian distribution.
@@ -275,11 +286,30 @@ public class Session implements Serializ
{
throw new RuntimeException(e);
}
+
+ if (cmd.hasOption("Q"))
+ {
+ AbstractType comparator = TypeParser.parse(DEFAULT_COMPARATOR);
+
+ String[] names = StringUtils.split(cmd.getOptionValue("Q"),
",");
+ columnNames = new ArrayList<ByteBuffer>(names.length);
+
+ for (String columnName : names)
+ columnNames.add(comparator.fromString(columnName));
+ }
+ else
+ {
+ columnNames = null;
+ }
}
catch (ParseException e)
{
throw new IllegalArgumentException(e.getMessage(), e);
}
+ catch (ConfigurationException e)
+ {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
mean = numDifferentKeys / 2;
sigma = numDifferentKeys * STDev;
@@ -417,8 +447,11 @@ public class Session implements Serializ
// column family for standard columns
CfDef standardCfDef = new CfDef("Keyspace1", "Standard1");
- System.out.println("Compression = " + compression);
-
standardCfDef.setComparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression);
+
+ standardCfDef.setComparator_type(DEFAULT_COMPARATOR)
+ .setDefault_validation_class(DEFAULT_VALIDATOR)
+ .setCompression(compression);
+
if (indexType != null)
{
ColumnDef standardColumn = new
ColumnDef(ByteBufferUtil.bytes("C1"), "BytesType");
@@ -428,7 +461,10 @@ public class Session implements Serializ
// column family with super columns
CfDef superCfDef = new CfDef("Keyspace1",
"Super1").setColumn_type("Super");
-
superCfDef.setComparator_type("AsciiType").setSubcomparator_type("AsciiType").setDefault_validation_class("BytesType").setCompression(compression);
+ superCfDef.setComparator_type(DEFAULT_COMPARATOR)
+ .setSubcomparator_type(DEFAULT_COMPARATOR)
+ .setDefault_validation_class(DEFAULT_VALIDATOR)
+ .setCompression(compression);
// column family for standard counters
CfDef counterCfDef = new CfDef("Keyspace1",
"Counter1").setDefault_validation_class("CounterColumnType").setReplicate_on_write(replicateOnWrite).setCompression(compression);
Modified:
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1160305&r1=1160304&r2=1160305&view=diff
==============================================================================
---
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
(original)
+++
cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java
Mon Aug 22 15:31:59 2011
@@ -37,16 +37,13 @@ public class Reader extends Operation
public void run(Cassandra.Client client) throws IOException
{
- SliceRange sliceRange = new SliceRange();
-
- // start/finish
- sliceRange.setStart(new byte[] {}).setFinish(new byte[] {});
-
- // reversed/count
- sliceRange.setReversed(false).setCount(session.getColumnsPerKey());
-
// initialize SlicePredicate with existing SliceRange
- SlicePredicate predicate = new
SlicePredicate().setSlice_range(sliceRange);
+ SlicePredicate predicate = new SlicePredicate();
+
+ if (session.columnNames == null)
+ predicate.setSlice_range(getSliceRange());
+ else // see CASSANDRA-3064 about why this is useful
+ predicate.setColumn_names(session.columnNames);
if (session.getColumnFamilyType() == ColumnFamilyType.Super)
{
@@ -150,4 +147,12 @@ public class Reader extends Operation
session.latency.getAndAdd(System.currentTimeMillis() - start);
}
+ private SliceRange getSliceRange()
+ {
+ return new SliceRange()
+ .setStart(new byte[] {})
+ .setFinish(new byte[] {})
+ .setReversed(false)
+ .setCount(session.getColumnsPerKey());
+ }
}