(stress) support for CQL prepared statements patch by David Alves; reviewed by Pavel Yaskevich for CASSANDRA-3633
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c60d2b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c60d2b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c60d2b Branch: refs/heads/trunk Commit: b1c60d2b33815d7ba2136b5c3318f7dbae3ee062 Parents: b94d8d4 Author: Pavel Yaskevich <[email protected]> Authored: Sat Jun 30 14:52:55 2012 +0300 Committer: Pavel Yaskevich <[email protected]> Committed: Sat Jun 30 15:09:46 2012 +0300 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../src/org/apache/cassandra/stress/Session.java | 26 ++++- .../org/apache/cassandra/stress/StressAction.java | 4 +- .../cassandra/stress/operations/CounterAdder.java | 3 +- .../cassandra/stress/operations/CounterGetter.java | 3 +- .../stress/operations/CqlCounterAdder.java | 47 +++++++--- .../stress/operations/CqlCounterGetter.java | 38 ++++++-- .../stress/operations/CqlIndexedRangeSlicer.java | 44 +++++--- .../cassandra/stress/operations/CqlInserter.java | 50 +++++++--- .../stress/operations/CqlMultiGetter.java | 4 +- .../stress/operations/CqlRangeSlicer.java | 38 ++++++-- .../cassandra/stress/operations/CqlReader.java | 61 ++++++++--- .../stress/operations/IndexedRangeSlicer.java | 3 +- .../cassandra/stress/operations/Inserter.java | 3 +- .../cassandra/stress/operations/MultiGetter.java | 3 +- .../cassandra/stress/operations/RangeSlicer.java | 3 +- .../apache/cassandra/stress/operations/Reader.java | 3 +- .../cassandra/stress/util/CassandraClient.java | 34 +++++++ .../apache/cassandra/stress/util/Operation.java | 77 +++++++++++++-- 19 files changed, 352 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eda806c..25d9784 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +1.1.3 + * (stress) support for CQL prepared statements (CASSANDRA-3633) + + 1.1.2 * Fix cleanup not deleting index entries (CASSANDRA-4379) * Use correct partitioner when saving + loading caches (CASSANDRA-4331) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/Session.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java index 5455e67..dbe1951 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Session.java +++ b/tools/stress/src/org/apache/cassandra/stress/Session.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.marshal.*; import org.apache.commons.cli.*; import org.apache.cassandra.db.ColumnFamilyType; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.thrift.*; import org.apache.commons.lang.StringUtils; @@ -80,6 +81,7 @@ public class Session implements Serializable availableOptions.addOption("g", "keys-per-call", true, "Number of keys to get_range_slices or multiget per call, default:1000"); availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1"); availableOptions.addOption("L", "enable-cql", false, "Perform queries using CQL (Cassandra Query Language)."); + availableOptions.addOption("P", "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL)."); availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE"); availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)"); availableOptions.addOption("R", "replication-strategy", true, "Replication strategy to use (only on insert if keyspace does not exist), default:org.apache.cassandra.locator.SimpleStrategy"); @@ -114,6 +116,7 @@ public class Session implements Serializable private boolean replicateOnWrite = true; private boolean ignoreErrors = false; private boolean enable_cql = false; + private boolean use_prepared = false; private final String outFileName; @@ -265,6 +268,16 @@ public class Session implements Serializable if (cmd.hasOption("L")) enable_cql = true; + if (cmd.hasOption("P")) + { + if (!enable_cql) + { + System.err.println("-P/--use-prepared-statements is only applicable with CQL (-L/--enable-cql)"); + System.exit(-1); + } + use_prepared = true; + } + if (cmd.hasOption("O")) { String[] pairs = StringUtils.split(cmd.getOptionValue("O"), ','); @@ -500,6 +513,11 @@ public class Session implements Serializable return enable_cql; } + public boolean usePreparedStatements() + { + return use_prepared; + } + /** * Create Keyspace1 with Standard1 and Super1 column families */ @@ -556,7 +574,7 @@ public class Session implements Serializable keyspace.setCf_defs(new ArrayList<CfDef>(Arrays.asList(standardCfDef, superCfDef, counterCfDef, counterSuperCfDef))); - Cassandra.Client client = getClient(false); + CassandraClient client = getClient(false); try { @@ -578,7 +596,7 @@ public class Session implements Serializable * Thrift client connection with Keyspace1 set. * @return cassandra client connection */ - public Cassandra.Client getClient() + public CassandraClient getClient() { return getClient(true); } @@ -587,14 +605,14 @@ public class Session implements Serializable * @param setKeyspace - should we set keyspace for client or not * @return cassandra client connection */ - public Cassandra.Client getClient(boolean setKeyspace) + public CassandraClient getClient(boolean setKeyspace) { // random node selection for fake load balancing String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)]; TSocket socket = new TSocket(currentNode, port); TTransport transport = (isUnframed()) ? socket : new TFramedTransport(socket); - Cassandra.Client client = new Cassandra.Client(new TBinaryProtocol(transport)); + CassandraClient client = new CassandraClient(new TBinaryProtocol(transport)); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index ac774b7..1227fe8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -22,8 +22,8 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import org.apache.cassandra.stress.operations.*; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; public class StressAction extends Thread { @@ -215,7 +215,7 @@ public class StressAction extends Thread public void run() { - Cassandra.Client connection = client.getClient(); + CassandraClient connection = client.getClient(); for (int i = 0; i < items; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java index 0c80f0a..0420154 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -37,7 +38,7 @@ public class CounterAdder extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { List<CounterColumn> columns = new ArrayList<CounterColumn>(); List<CounterSuperColumn> superColumns = new ArrayList<CounterSuperColumn>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java index 3d8b1fd..a06298d 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -33,7 +34,7 @@ public class CounterGetter extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { SliceRange sliceRange = new SliceRange(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java index fa82d57..7197eaa 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java @@ -23,39 +23,48 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; - -import static com.google.common.base.Charsets.UTF_8; +import org.apache.cassandra.utils.ByteBufferUtil; public class CqlCounterAdder extends Operation { + private static String cqlQuery = null; + public CqlCounterAdder(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); - StringBuilder query = new StringBuilder( - "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString() + " SET "); - - for (int i = 0; i < session.getColumnsPerKey(); i++) + if (cqlQuery == null) { - if (i > 0) - query.append(","); - query.append('C').append(i).append("=C").append(i).append("+1"); + StringBuilder query = new StringBuilder( + "UPDATE Counter1 USING CONSISTENCY " + session.getConsistencyLevel().toString() + " SET "); + + for (int i = 0; i < session.getColumnsPerKey(); i++) + { + if (i > 0) + query.append(","); + + query.append('C').append(i).append("=C").append(i).append("+1"); + + } + query.append(" WHERE KEY=?"); + cqlQuery = query.toString(); } String key = String.format("%0" + session.getTotalKeysLength() + "d", index); - query.append( " WHERE KEY=").append(getQuotedCqlBlob(key.getBytes(UTF_8))); + String formattedQuery = null; long start = System.currentTimeMillis(); @@ -69,7 +78,19 @@ public class CqlCounterAdder extends Operation try { - client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE); + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + client.execute_prepared_cql_query(stmntId, + Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key)))); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key))); + client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); + } + success = true; } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java index 1044c6d..1133747 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java @@ -23,33 +23,41 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.CqlResultType; +import org.apache.cassandra.utils.ByteBufferUtil; public class CqlCounterGetter extends Operation { + private static String cqlQuery = null; public CqlCounterGetter(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); + if (cqlQuery == null) + { + StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) + .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()) + .append(" WHERE KEY=?"); + cqlQuery = query.toString(); + } + byte[] key = generateKey(); - String hexKey = getQuotedCqlBlob(key); - StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) - .append(" ''..'' FROM Counter1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()) - .append(" WHERE KEY=").append(hexKey); + String formattedQuery = null; long start = System.currentTimeMillis(); @@ -63,8 +71,22 @@ public class CqlCounterGetter extends Operation try { - CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), - Compression.NONE); + CqlResult result = null; + + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + result = client.execute_prepared_cql_query(stmntId, + Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key)))); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key))); + result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), + Compression.NONE); + } + assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type"; assert result.rows.size() == 0 : "expected exactly one row"; success = (result.rows.get(0).columns.size() != 0); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java index 978c1c4..383ad67 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java @@ -23,30 +23,29 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.CqlRow; import org.apache.cassandra.utils.ByteBufferUtil; -import static org.apache.cassandra.utils.Hex.bytesToHex;; - public class CqlIndexedRangeSlicer extends Operation { private static List<ByteBuffer> values = null; - private static String clauseFragment = "KEY > '%s' LIMIT %d"; + private static String cqlQuery = null; public CqlIndexedRangeSlicer(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -54,12 +53,18 @@ public class CqlIndexedRangeSlicer extends Operation if (values == null) values = generateValues(); - String format = "%0" + session.getTotalKeysLength() + "d"; + if (cqlQuery == null) + { + StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) + .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel()) + .append(" WHERE C1=").append(getUnQuotedCqlBlob(values.get(1).array())) + .append(" AND KEY > ? LIMIT ").append(session.getKeysPerCall()); + + cqlQuery = query.toString(); + } + String format = "%0" + session.getTotalKeysLength() + "d"; String startOffset = String.format(format, 0); - StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) - .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel()) - .append(" WHERE C1 = ").append(getQuotedCqlBlob(values.get(1).array())).append(" AND "); int expectedPerValue = session.getNumKeys() / values.size(), received = 0; @@ -70,6 +75,8 @@ public class CqlIndexedRangeSlicer extends Operation boolean success = false; String exceptionMessage = null; CqlResult results = null; + String formattedQuery = null; + List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset)); for (int t = 0; t < session.getRetryTimes(); t++) { @@ -78,8 +85,18 @@ public class CqlIndexedRangeSlicer extends Operation try { - ByteBuffer queryBytes = ByteBuffer.wrap(makeQuery(query, startOffset).getBytes()); - results = client.execute_cql_query(queryBytes, Compression.NONE); + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms)); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, queryParms); + results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); + } + success = (results.rows.size() != 0); } catch (Exception e) @@ -109,11 +126,6 @@ public class CqlIndexedRangeSlicer extends Operation } } - private String makeQuery(StringBuilder base, String startOffset) - { - return base.toString() + String.format(clauseFragment, bytesToHex(startOffset.getBytes()), session.getKeysPerCall()); - } - /** * Get maximum key from CqlRow list * @param rows list of the CqlRow objects http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java index d7a7641..c729f2f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java @@ -23,25 +23,27 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.utils.UUIDGen; public class CqlInserter extends Operation { private static List<ByteBuffer> values; + private static String cqlQuery = null; public CqlInserter(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -49,26 +51,39 @@ public class CqlInserter extends Operation if (values == null) values = generateValues(); - StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ") - .append(session.getConsistencyLevel().toString()).append(" SET "); + // Construct a query string once. + if (cqlQuery == null) + { + StringBuilder query = new StringBuilder("UPDATE Standard1 USING CONSISTENCY ") + .append(session.getConsistencyLevel().toString()).append(" SET "); + for (int i = 0; i < session.getColumnsPerKey(); i++) + { + if (i > 0) query.append(','); + query.append("?=?"); + } + + query.append(" WHERE KEY=?"); + cqlQuery = query.toString(); + } + + List<String> queryParms = new ArrayList<String>(); for (int i = 0; i < session.getColumnsPerKey(); i++) { - if (i > 0) - query.append(','); - // Column name if (session.timeUUIDComparator) - query.append(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString()); + queryParms.add(UUIDGen.makeType1UUIDFromHost(Session.getLocalAddress()).toString()); else - query.append('C').append(i); + queryParms.add(new String("C" + i)); // Column value - query.append('=').append(getQuotedCqlBlob(values.get(i % values.size()).array())); + queryParms.add(new String(getUnQuotedCqlBlob(values.get(i % values.size()).array()))); } String key = String.format("%0" + session.getTotalKeysLength() + "d", index); - query.append(" WHERE KEY=").append(getQuotedCqlBlob(key)); + queryParms.add(new String(getUnQuotedCqlBlob(key))); + + String formattedQuery = null; long start = System.currentTimeMillis(); @@ -82,7 +97,18 @@ public class CqlInserter extends Operation try { - client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), Compression.NONE); + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms)); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, queryParms); + client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); + } + success = true; } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java index 3125cff..e9b1f47 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java @@ -24,8 +24,8 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; public class CqlMultiGetter extends Operation { @@ -34,7 +34,7 @@ public class CqlMultiGetter extends Operation super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { throw new RuntimeException("Multiget is not implemented for CQL"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java index e57a9ac..8b20867 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java @@ -23,30 +23,41 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; + +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; public class CqlRangeSlicer extends Operation { + private static String cqlQuery = null; + public CqlRangeSlicer(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); + if (cqlQuery == null) + { + StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) + .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()) + .append(" WHERE KEY > ?"); + cqlQuery = query.toString(); + } + String key = String.format("%0" + session.getTotalKeysLength() + "d", index); - StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) - .append(" ''..'' FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()) - .append(" WHERE KEY > ").append(getQuotedCqlBlob(key)); + String formattedQuery = null; long startTime = System.currentTimeMillis(); @@ -61,8 +72,21 @@ public class CqlRangeSlicer extends Operation try { - CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), - Compression.NONE); + CqlResult result = null; + + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + result = client.execute_prepared_cql_query(stmntId, + Collections.singletonList(ByteBufferUtil.bytes(getUnQuotedCqlBlob(key)))); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key))); + result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); + } + rowCount = result.rows.size(); success = (rowCount != 0); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java index 93a5c79..cfac2d6 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java @@ -23,46 +23,60 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; public class CqlReader extends Operation { + private static String cqlQuery = null; + public CqlReader(Session client, int idx) { super(client, idx); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); - StringBuilder query = new StringBuilder("SELECT "); - - if (session.columnNames == null) - { - query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''"); - } - else + if (cqlQuery == null) { - for (int i = 0; i < session.columnNames.size(); i++) + StringBuilder query = new StringBuilder("SELECT "); + + if (session.columnNames == null) + query.append("FIRST ").append(session.getColumnsPerKey()).append(" ''..''"); + else { - if (i > 0) - query.append(","); - query.append('\'').append(new String(session.columnNames.get(i).array())).append('\''); + for (int i = 0; i < session.columnNames.size(); i++) + { + if (i > 0) query.append(","); + query.append('?'); + } } + + query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()); + query.append(" WHERE KEY=?"); + + cqlQuery = query.toString(); } + List<String> queryParams = new ArrayList<String>(); + if (session.columnNames != null) + for (int i = 0; i < session.columnNames.size(); i++) + queryParams.add(getUnQuotedCqlBlob(session.columnNames.get(i).array())); + byte[] key = generateKey(); + queryParams.add(getUnQuotedCqlBlob(key)); - query.append(" FROM Standard1 USING CONSISTENCY ").append(session.getConsistencyLevel().toString()); - query.append(" WHERE KEY=").append(getQuotedCqlBlob(key)); + String formattedQuery = null; long start = System.currentTimeMillis(); @@ -76,8 +90,21 @@ public class CqlReader extends Operation try { - CqlResult result = client.execute_cql_query(ByteBuffer.wrap(query.toString().getBytes()), - Compression.NONE); + CqlResult result = null; + + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams)); + } + else + { + if (formattedQuery == null) + formattedQuery = formatCqlQuery(cqlQuery, queryParams); + result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), + Compression.NONE); + } + success = (result.rows.get(0).columns.size() != 0); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java index c117862..8768de8 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -36,7 +37,7 @@ public class IndexedRangeSlicer extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (values == null) values = generateValues(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java index a887724..0623e4c 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -40,7 +41,7 @@ public class Inserter extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { if (values == null) values = generateValues(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java index c50dd1b..f569f66 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -37,7 +38,7 @@ public class MultiGetter extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java index 308eefe..e462e30 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -36,7 +37,7 @@ public class RangeSlicer extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { String format = "%0" + session.getTotalKeysLength() + "d"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java index b5a8781..412ebdf 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java @@ -18,6 +18,7 @@ package org.apache.cassandra.stress.operations; import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -35,7 +36,7 @@ public class Reader extends Operation super(client, index); } - public void run(Cassandra.Client client) throws IOException + public void run(CassandraClient client) throws IOException { // initialize SlicePredicate with existing SliceRange SlicePredicate predicate = new SlicePredicate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java new file mode 100644 index 0000000..5136a55 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/util/CassandraClient.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.stress.util; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.thrift.Cassandra.Client; +import org.apache.thrift.protocol.TProtocol; + +public class CassandraClient extends Client +{ + public Map<Integer, Integer> preparedStatements = new HashMap<Integer, Integer>(); + + public CassandraClient(TProtocol protocol) + { + super(protocol); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c60d2b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java index 4e08909..cbeaad7 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.stress.util; +import static com.google.common.base.Charsets.UTF_8; + import java.io.IOException; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -25,17 +27,19 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import static com.google.common.base.Charsets.UTF_8; +import com.google.common.base.Function; +import com.google.common.collect.Lists; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.Stress; -import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.CqlPreparedResult; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.UUIDGen; public abstract class Operation { @@ -61,7 +65,7 @@ public abstract class Operation * @param client Cassandra Thrift client connection * @throws IOException on any I/O error. */ - public abstract void run(Cassandra.Client client) throws IOException; + public abstract void run(CassandraClient client) throws IOException; // Utility methods @@ -226,13 +230,70 @@ public abstract class Operation System.err.println(message); } - protected String getQuotedCqlBlob(String term) + protected String getUnQuotedCqlBlob(String term) + { + return getUnQuotedCqlBlob(term.getBytes()); + } + + protected String getUnQuotedCqlBlob(byte[] term) + { + return Hex.bytesToHex(term); + } + + protected List<ByteBuffer> queryParamsAsByteBuffer(List<String> queryParams) + { + return Lists.transform(queryParams, new Function<String, ByteBuffer>() + { + @Override + public ByteBuffer apply(String param) + { + return ByteBufferUtil.bytes(param); + } + }); + } + + /** + * Constructs a CQL query string by replacing instances of the character + * '?', with the corresponding parameter. + * + * @param query base query string to format + * @param parms sequence of string query parameters + * @return formatted CQL query string + */ + protected static String formatCqlQuery(String query, List<String> parms) { - return getQuotedCqlBlob(term.getBytes()); + int marker = 0, position = 0; + StringBuilder result = new StringBuilder(); + + if (-1 == (marker = query.indexOf('?')) || parms.size() == 0) + return query; + + for (String parm : parms) + { + result.append(query.substring(position, marker)); + result.append('\'').append(parm).append('\''); + + position = marker + 1; + if (-1 == (marker = query.indexOf('?', position + 1))) + break; + } + + if (position < query.length()) + result.append(query.substring(position)); + + return result.toString(); } - protected String getQuotedCqlBlob(byte[] term) + protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception { - return String.format("'%s'", Hex.bytesToHex(term)); + Integer statementId = client.preparedStatements.get(cqlQuery.hashCode()); + if (statementId == null) + { + CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE); + statementId = response.itemId; + client.preparedStatements.put(cqlQuery.hashCode(), statementId); + } + + return statementId; } }
