Updated Branches: refs/heads/cassandra-1.2 c315745c9 -> 06699d47b
Add binary protocol to stress patch by slebresne; reviewed by yukim for CASSANDRA-4993 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06699d47 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06699d47 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06699d47 Branch: refs/heads/cassandra-1.2 Commit: 06699d47b62d8af8dbb60481ab36e5ed1805e0a0 Parents: c315745 Author: Sylvain Lebresne <[email protected]> Authored: Wed Mar 6 09:07:04 2013 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Mar 6 09:08:18 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cassandra | 2 +- .../apache/cassandra/transport/SimpleClient.java | 5 + .../src/org/apache/cassandra/stress/Session.java | 26 ++++ .../org/apache/cassandra/stress/StressAction.java | 62 +++++++--- .../cassandra/stress/operations/CQLOperation.java | 96 +++++++++++++++ .../stress/operations/CqlCounterAdder.java | 39 +++---- .../stress/operations/CqlCounterGetter.java | 43 +++---- .../stress/operations/CqlIndexedRangeSlicer.java | 75 +++++++---- .../cassandra/stress/operations/CqlInserter.java | 39 +++---- .../stress/operations/CqlMultiGetter.java | 6 + .../stress/operations/CqlRangeSlicer.java | 49 ++++---- .../cassandra/stress/operations/CqlReader.java | 42 +++---- .../apache/cassandra/stress/util/Operation.java | 29 ++++- 14 files changed, 344 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f4e854b..d53a1e3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,6 +14,7 @@ * cli: Warn about missing CQL3 tables in schema descriptions (CASSANDRA-5309) * Re-enable unknown option in replication/compaction strategies option for backward compatibility (CASSANDRA-4795) + * Add binary protocol support to stress (CASSANDRA-4993) Merged from 1.1: * nodetool: ability to repair specific range (CASSANDRA-5280) * Fix possible assertion triggered in SliceFromReadCommand (CASSANDRA-5284) http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/bin/cassandra ---------------------------------------------------------------------- diff --git a/bin/cassandra b/bin/cassandra index 5403257..25498aa 100755 --- a/bin/cassandra +++ b/bin/cassandra @@ -129,7 +129,7 @@ launch_service() if [ "x$pidpath" != "x" ]; then cassandra_parms="$cassandra_parms -Dcassandra-pidfile=$pidpath" fi - + # The cassandra-foreground option will tell CassandraDaemon not # to close stdout/stderr, but it's up to us not to background. if [ "x$foreground" != "x" ]; then http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index f2963bd..7979570 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -93,6 +93,11 @@ public class SimpleClient this.encryptionOptions = encryptionOptions; } + public SimpleClient(String host, int port) + { + this(host, port, new ClientEncryptionOptions()); + } + public void connect(boolean useCompression) throws IOException { establishConnection(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/Session.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/Session.java b/tools/stress/src/org/apache/cassandra/stress/Session.java index 804e4e8..d16ee78 100644 --- a/tools/stress/src/org/apache/cassandra/stress/Session.java +++ b/tools/stress/src/org/apache/cassandra/stress/Session.java @@ -38,6 +38,7 @@ import org.apache.commons.cli.*; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.util.CassandraClient; +import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.thrift.*; import org.apache.commons.lang.StringUtils; @@ -95,6 +96,7 @@ public class Session implements Serializable availableOptions.addOption("l", "replication-factor", true, "Replication Factor to use when creating needed column families, default:1"); availableOptions.addOption("L", "enable-cql", false, "Perform queries using CQL2 (Cassandra Query Language v 2.0.0)"); availableOptions.addOption("L3", "enable-cql3", false, "Perform queries using CQL3 (Cassandra Query Language v 3.0.0)"); + availableOptions.addOption("b", "enable-native-protocol", false, "Use the binary native protocol (only work along with -L3)"); availableOptions.addOption("P", "use-prepared-statements", false, "Perform queries using prepared statements (only applicable to CQL)."); availableOptions.addOption("e", "consistency-level", true, "Consistency Level to use (ONE, QUORUM, LOCAL_QUORUM, EACH_QUORUM, ALL, ANY), default:ONE"); availableOptions.addOption("x", "create-index", true, "Type of index to create on needed column families (KEYS)"); @@ -138,6 +140,7 @@ public class Session implements Serializable private boolean enable_cql = false; private boolean use_prepared = false; private boolean trace = false; + public boolean use_native_protocol = false; private final String outFileName; @@ -300,6 +303,12 @@ public class Session implements Serializable cqlVersion = "3.0.0"; } + if (cmd.hasOption("b")) + { + if (!(enable_cql && cqlVersion.startsWith("3"))) + throw new IllegalArgumentException("Cannot use binary protocol without -L3"); + use_native_protocol = true; + } if (cmd.hasOption("P")) { @@ -691,6 +700,7 @@ public class Session implements Serializable { return getClient(true); } + /** * Thrift client connection * @param setKeyspace - should we set keyspace for client or not @@ -730,6 +740,22 @@ public class Session implements Serializable return client; } + public SimpleClient getNativeClient() + { + try + { + String currentNode = nodes[Stress.randomizer.nextInt(nodes.length)]; + SimpleClient client = new SimpleClient(currentNode, 9042); + client.connect(false); + client.execute("USE \"Keyspace1\";", org.apache.cassandra.db.ConsistencyLevel.ONE); + return client; + } + catch (Exception e) + { + throw new RuntimeException(e.getMessage()); + } + } + public static InetAddress getLocalAddress() { if (localInetAddress == null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/StressAction.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java index 27675d1..60e8cbd 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java @@ -25,6 +25,7 @@ import com.yammer.metrics.stats.Snapshot; import org.apache.cassandra.stress.operations.*; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.SimpleClient; public class StressAction extends Thread { @@ -218,29 +219,60 @@ public class StressAction extends Thread public void run() { - CassandraClient connection = client.getClient(); - - for (int i = 0; i < items; i++) + if (client.use_native_protocol) { - if (stop) - break; + SimpleClient connection = client.getNativeClient(); - try + for (int i = 0; i < items; i++) { - operations.take().run(connection); // running job + if (stop) + break; + + try + { + operations.take().run(connection); // running job + } + catch (Exception e) + { + if (output == null) + { + System.err.println(e.getMessage()); + returnCode = StressAction.FAILURE; + System.exit(-1); + } + + output.println(e.getMessage()); + returnCode = StressAction.FAILURE; + break; + } } - catch (Exception e) + } + else + { + CassandraClient connection = client.getClient(); + + for (int i = 0; i < items; i++) { - if (output == null) + if (stop) + break; + + try { - System.err.println(e.getMessage()); + operations.take().run(connection); // running job + } + catch (Exception e) + { + if (output == null) + { + System.err.println(e.getMessage()); + returnCode = StressAction.FAILURE; + System.exit(-1); + } + + output.println(e.getMessage()); returnCode = StressAction.FAILURE; - System.exit(-1); + break; } - - output.println(e.getMessage()); - returnCode = StressAction.FAILURE; - break; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java new file mode 100644 index 0000000..54737a4 --- /dev/null +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CQLOperation.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.stress.operations; + +import java.nio.ByteBuffer; +import java.io.IOException; +import java.util.List; + +import org.apache.cassandra.stress.Session; +import org.apache.cassandra.stress.util.CassandraClient; +import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.ThriftConversion; + +public abstract class CQLOperation extends Operation +{ + public CQLOperation(Session client, int idx) + { + super(client, idx); + } + + protected abstract void run(CQLQueryExecutor executor) throws IOException; + + protected abstract boolean validateThriftResult(CqlResult result); + + protected abstract boolean validateNativeResult(ResultMessage result); + + public void run(final CassandraClient client) throws IOException + { + run(new CQLQueryExecutor() + { + public boolean execute(String cqlQuery, List<String> queryParams) throws Exception + { + CqlResult result = null; + if (session.usePreparedStatements()) + { + Integer stmntId = getPreparedStatement(client, cqlQuery); + if (session.cqlVersion.startsWith("3")) + result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel()); + else + result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams)); + } + else + { + String formattedQuery = formatCqlQuery(cqlQuery, queryParams); + if (session.cqlVersion.startsWith("3")) + result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); + else + result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); + } + return validateThriftResult(result); + } + }); + } + + public void run(final SimpleClient client) throws IOException + { + run(new CQLQueryExecutor() + { + public boolean execute(String cqlQuery, List<String> queryParams) throws Exception + { + ResultMessage result = null; + if (session.usePreparedStatements()) + { + byte[] stmntId = getPreparedStatement(client, cqlQuery); + result = client.executePrepared(stmntId, queryParamsAsByteBuffer(queryParams), ThriftConversion.fromThrift(session.getConsistencyLevel())); + } + else + { + String formattedQuery = formatCqlQuery(cqlQuery, queryParams); + result = client.execute(formattedQuery, ThriftConversion.fromThrift(session.getConsistencyLevel())); + } + return validateNativeResult(result); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java index b0633ea..31e8371 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterAdder.java @@ -24,16 +24,19 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import com.yammer.metrics.core.TimerContext; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.utils.ByteBufferUtil; -public class CqlCounterAdder extends Operation +public class CqlCounterAdder extends CQLOperation { private static String cqlQuery = null; @@ -42,7 +45,7 @@ public class CqlCounterAdder extends Operation super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -70,7 +73,7 @@ public class CqlCounterAdder extends Operation } String key = String.format("%0" + session.getTotalKeysLength() + "d", index); - String formattedQuery = null; + List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))); TimerContext context = session.latency.time(); @@ -84,25 +87,7 @@ public class CqlCounterAdder extends Operation try { - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())), session.getConsistencyLevel()); - else - client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes()))); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")))); - if (session.cqlVersion.startsWith("3")) - client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - success = true; + success = executor.execute(cqlQuery, queryParams); } catch (Exception e) { @@ -124,4 +109,14 @@ public class CqlCounterAdder extends Operation session.keys.getAndIncrement(); context.stop(); } + + protected boolean validateThriftResult(CqlResult result) + { + return true; + } + + protected boolean validateNativeResult(ResultMessage result) + { + return true; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java index 7feee5b..a4d037a 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlCounterGetter.java @@ -24,18 +24,20 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import com.yammer.metrics.core.TimerContext; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.CqlResultType; import org.apache.cassandra.utils.ByteBufferUtil; -public class CqlCounterGetter extends Operation +public class CqlCounterGetter extends CQLOperation { private static String cqlQuery = null; @@ -44,7 +46,7 @@ public class CqlCounterGetter extends Operation super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -69,7 +71,7 @@ public class CqlCounterGetter extends Operation } byte[] key = generateKey(); - String formattedQuery = null; + List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))); TimerContext context = session.latency.time(); @@ -83,30 +85,7 @@ public class CqlCounterGetter extends Operation try { - CqlResult result = null; - - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key)), session.getConsistencyLevel()); - else - result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key))); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")))); - - if (session.cqlVersion.startsWith("3")) - result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - assert result.type.equals(CqlResultType.ROWS) : "expected ROWS result type"; - assert result.rows.size() == 0 : "expected exactly one row"; - success = (result.rows.get(0).columns.size() != 0); + success = executor.execute(cqlQuery, queryParams); } catch (Exception e) { @@ -128,4 +107,14 @@ public class CqlCounterGetter extends Operation session.keys.getAndIncrement(); context.stop(); } + + protected boolean validateThriftResult(CqlResult result) + { + return result.rows.get(0).columns.size() != 0; + } + + protected boolean validateNativeResult(ResultMessage result) + { + return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java index b1fa85e..bf416cc 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlIndexedRangeSlicer.java @@ -27,26 +27,31 @@ import java.util.Collections; import java.util.List; import com.yammer.metrics.core.TimerContext; +import org.apache.cassandra.cql3.ResultSet; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.thrift.CqlRow; import org.apache.cassandra.utils.ByteBufferUtil; -public class CqlIndexedRangeSlicer extends Operation +public class CqlIndexedRangeSlicer extends CQLOperation { private static List<ByteBuffer> values = null; private static String cqlQuery = null; + private int lastQueryResultSize; + private int lastMaxKey; + public CqlIndexedRangeSlicer(Session client, int idx) { super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -56,8 +61,14 @@ public class CqlIndexedRangeSlicer extends Operation if (cqlQuery == null) { - StringBuilder query = new StringBuilder("SELECT FIRST ").append(session.getColumnsPerKey()) - .append(" ''..'' FROM Standard1"); + StringBuilder query = new StringBuilder("SELECT "); + + if (session.cqlVersion.startsWith("2")) + query.append(session.getColumnsPerKey()).append(" ''..''"); + else + query.append("*"); + + query.append(" FROM Standard1"); if (session.cqlVersion.startsWith("2")) query.append(" USING CONSISTENCY ").append(session.getConsistencyLevel()); @@ -79,7 +90,6 @@ public class CqlIndexedRangeSlicer extends Operation boolean success = false; String exceptionMessage = null; - CqlResult results = null; String formattedQuery = null; List<String> queryParms = Collections.singletonList(getUnQuotedCqlBlob(startOffset, session.cqlVersion.startsWith("3"))); @@ -90,25 +100,7 @@ public class CqlIndexedRangeSlicer extends Operation try { - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - results = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms), session.getConsistencyLevel()); - else - results = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms)); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, queryParms); - if (session.cqlVersion.startsWith("3")) - results = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - results = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - success = (results.rows.size() != 0); + success = executor.execute(cqlQuery, queryParms); } catch (Exception e) { @@ -126,13 +118,13 @@ public class CqlIndexedRangeSlicer extends Operation (exceptionMessage == null) ? "" : "(" + exceptionMessage + ")")); } - received += results.rows.size(); + received += lastQueryResultSize; // convert max key found back to an integer, and increment it - startOffset = String.format(format, (1 + getMaxKey(results.rows))); + startOffset = String.format(format, (1 + lastMaxKey)); session.operations.getAndIncrement(); - session.keys.getAndAdd(results.rows.size()); + session.keys.getAndAdd(lastQueryResultSize); context.stop(); } } @@ -155,4 +147,33 @@ public class CqlIndexedRangeSlicer extends Operation return maxKey; } + + private int getMaxKey(ResultSet rs) + { + int maxKey = ByteBufferUtil.toInt(rs.rows.get(0).get(0)); + + for (List<ByteBuffer> row : rs.rows) + { + int currentKey = ByteBufferUtil.toInt(row.get(0)); + if (currentKey > maxKey) + maxKey = currentKey; + } + + return maxKey; + } + + protected boolean validateThriftResult(CqlResult result) + { + lastQueryResultSize = result.rows.size(); + lastMaxKey = getMaxKey(result.rows); + return lastQueryResultSize != 0; + } + + protected boolean validateNativeResult(ResultMessage result) + { + assert result instanceof ResultMessage.Rows; + lastQueryResultSize = ((ResultMessage.Rows)result).result.size(); + lastMaxKey = getMaxKey(((ResultMessage.Rows)result).result); + return lastQueryResultSize != 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java index ed03f1f..3572c36 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlInserter.java @@ -31,10 +31,13 @@ import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.CqlResult; import org.apache.cassandra.utils.UUIDGen; -public class CqlInserter extends Operation +public class CqlInserter extends CQLOperation { private static List<ByteBuffer> values; private static String cqlQuery = null; @@ -44,7 +47,7 @@ public class CqlInserter extends Operation super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -95,8 +98,6 @@ public class CqlInserter extends Operation String key = String.format("%0" + session.getTotalKeysLength() + "d", index); queryParms.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))); - String formattedQuery = null; - TimerContext context = session.latency.time(); boolean success = false; @@ -109,25 +110,7 @@ public class CqlInserter extends Operation try { - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParms), session.getConsistencyLevel()); - else - client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParms)); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, queryParms); - if (session.cqlVersion.startsWith("3")) - client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - success = true; + success = executor.execute(cqlQuery, queryParms); } catch (Exception e) { @@ -150,4 +133,14 @@ public class CqlInserter extends Operation session.keys.getAndIncrement(); context.stop(); } + + protected boolean validateThriftResult(CqlResult result) + { + return true; + } + + protected boolean validateNativeResult(ResultMessage result) + { + return true; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java index e9b1f47..ec645d4 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlMultiGetter.java @@ -26,6 +26,7 @@ import java.io.IOException; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.SimpleClient; public class CqlMultiGetter extends Operation { @@ -38,4 +39,9 @@ public class CqlMultiGetter extends Operation { throw new RuntimeException("Multiget is not implemented for CQL"); } + + public void run(SimpleClient client) throws IOException + { + throw new RuntimeException("Multiget is not implemented for CQL"); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java index acf0602..c01767b 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlRangeSlicer.java @@ -24,6 +24,7 @@ package org.apache.cassandra.stress.operations; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; import com.yammer.metrics.core.TimerContext; import org.apache.cassandra.utils.ByteBufferUtil; @@ -32,19 +33,22 @@ import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.transport.SimpleClient; -public class CqlRangeSlicer extends Operation +public class CqlRangeSlicer extends CQLOperation { private static String cqlQuery = null; + private int lastRowCount; public CqlRangeSlicer(Session client, int idx) { super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -61,13 +65,12 @@ public class CqlRangeSlicer extends Operation } String key = String.format("%0" + session.getTotalKeysLength() + "d", index); - String formattedQuery = null; + List<String> queryParams = Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))); TimerContext context = session.latency.time(); boolean success = false; String exceptionMessage = null; - int rowCount = 0; for (int t = 0; t < session.getRetryTimes(); t++) { @@ -76,28 +79,7 @@ public class CqlRangeSlicer extends Operation try { - CqlResult result = null; - - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - result = client.execute_prepared_cql3_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes())), session.getConsistencyLevel()); - else - result = client.execute_prepared_cql_query(stmntId, Collections.singletonList(ByteBuffer.wrap(key.getBytes()))); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, Collections.singletonList(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3")))); - if (session.cqlVersion.startsWith("3")) - result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - rowCount = result.rows.size(); - success = (rowCount != 0); + success = executor.execute(cqlQuery, queryParams); } catch (Exception e) { @@ -117,7 +99,20 @@ public class CqlRangeSlicer extends Operation } session.operations.getAndIncrement(); - session.keys.getAndAdd(rowCount); + session.keys.getAndAdd(lastRowCount); context.stop(); } + + protected boolean validateThriftResult(CqlResult result) + { + lastRowCount = result.rows.size(); + return lastRowCount != 0; + } + + protected boolean validateNativeResult(ResultMessage result) + { + assert result instanceof ResultMessage.Rows; + lastRowCount = ((ResultMessage.Rows)result).result.size(); + return lastRowCount != 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java index 58d77dd..70273c1 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/CqlReader.java @@ -31,10 +31,13 @@ import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.CassandraClient; import org.apache.cassandra.stress.util.Operation; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.ThriftConversion; -public class CqlReader extends Operation +public class CqlReader extends CQLOperation { private static String cqlQuery = null; @@ -43,7 +46,7 @@ public class CqlReader extends Operation super(client, idx); } - public void run(CassandraClient client) throws IOException + protected void run(CQLQueryExecutor executor) throws IOException { if (session.getColumnFamilyType() == ColumnFamilyType.Super) throw new RuntimeException("Super columns are not implemented for CQL"); @@ -85,8 +88,6 @@ public class CqlReader extends Operation byte[] key = generateKey(); queryParams.add(getUnQuotedCqlBlob(key, session.cqlVersion.startsWith("3"))); - String formattedQuery = null; - TimerContext context = session.latency.time(); boolean success = false; @@ -99,31 +100,10 @@ public class CqlReader extends Operation try { - CqlResult result = null; - - if (session.usePreparedStatements()) - { - Integer stmntId = getPreparedStatement(client, cqlQuery); - if (session.cqlVersion.startsWith("3")) - result = client.execute_prepared_cql3_query(stmntId, queryParamsAsByteBuffer(queryParams), session.getConsistencyLevel()); - else - result = client.execute_prepared_cql_query(stmntId, queryParamsAsByteBuffer(queryParams)); - } - else - { - if (formattedQuery == null) - formattedQuery = formatCqlQuery(cqlQuery, queryParams); - if (session.cqlVersion.startsWith("3")) - result = client.execute_cql3_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE, session.getConsistencyLevel()); - else - result = client.execute_cql_query(ByteBuffer.wrap(formattedQuery.getBytes()), Compression.NONE); - } - - success = (result.rows.get(0).columns.size() != 0); + success = executor.execute(cqlQuery, queryParams); } catch (Exception e) { - exceptionMessage = getExceptionMessage(e); success = false; } @@ -143,4 +123,14 @@ public class CqlReader extends Operation session.keys.getAndIncrement(); context.stop(); } + + protected boolean validateThriftResult(CqlResult result) + { + return result.rows.get(0).columns.size() != 0; + } + + protected boolean validateNativeResult(ResultMessage result) + { + return result instanceof ResultMessage.Rows && ((ResultMessage.Rows)result).result.size() != 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/06699d47/tools/stress/src/org/apache/cassandra/stress/util/Operation.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java index d1cfc22..f7924da 100644 --- a/tools/stress/src/org/apache/cassandra/stress/util/Operation.java +++ b/tools/stress/src/org/apache/cassandra/stress/util/Operation.java @@ -26,6 +26,8 @@ import java.security.MessageDigest; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.Map; +import java.util.HashMap; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -33,6 +35,7 @@ import com.google.common.collect.Lists; import org.apache.cassandra.db.marshal.TimeUUIDType; import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.Stress; +import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.thrift.Compression; import org.apache.cassandra.thrift.CqlPreparedResult; import org.apache.cassandra.thrift.InvalidRequestException; @@ -67,6 +70,8 @@ public abstract class Operation */ public abstract void run(CassandraClient client) throws IOException; + public void run(SimpleClient client) throws IOException {} + // Utility methods protected List<ByteBuffer> generateValues() @@ -287,12 +292,14 @@ public abstract class Operation return result.toString(); } - protected static Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception + protected Integer getPreparedStatement(CassandraClient client, String cqlQuery) throws Exception { Integer statementId = client.preparedStatements.get(cqlQuery.hashCode()); if (statementId == null) { - CqlPreparedResult response = client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE); + CqlPreparedResult response = session.cqlVersion.startsWith("3") + ? client.prepare_cql3_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE) + : client.prepare_cql_query(ByteBufferUtil.bytes(cqlQuery), Compression.NONE); statementId = response.itemId; client.preparedStatements.put(cqlQuery.hashCode(), statementId); } @@ -300,10 +307,28 @@ public abstract class Operation return statementId; } + private static final Map<Integer, byte[]> preparedStatementsNative = new HashMap<Integer, byte[]>(); + + protected static byte[] getPreparedStatement(SimpleClient client, String cqlQuery) throws Exception + { + byte[] statementId = preparedStatementsNative.get(cqlQuery.hashCode()); + if (statementId == null) + { + statementId = client.prepare(cqlQuery).statementId.bytes; + preparedStatementsNative.put(cqlQuery.hashCode(), statementId); + } + return statementId; + } + protected String wrapInQuotesIfRequired(String string) { return session.cqlVersion.startsWith("3") ? "\"" + string + "\"" : string; } + + public interface CQLQueryExecutor + { + public boolean execute(String query, List<String> queryParameters) throws Exception; + } }
