Repository: cassandra Updated Branches: refs/heads/trunk 1096f9f5e -> 1f533260a
Change protocol to allow sending key space independent of query string patch by Sandeep Tamhankar; reviewed by Tyler Hobbs + Robert Stupp for CASSANDRA-10145 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1f533260 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1f533260 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1f533260 Branch: refs/heads/trunk Commit: 1f533260a01552790aff0f5f2f8f2f0aee8dbf10 Parents: 1096f9f Author: Sandeep Tamhankar <[email protected]> Authored: Wed Apr 12 20:56:35 2017 +0200 Committer: Robert Stupp <[email protected]> Committed: Wed Apr 12 20:56:35 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v5.spec | 25 +++++- .../cassandra/cql3/BatchQueryOptions.java | 5 ++ .../CustomPayloadMirroringQueryHandler.java | 5 +- .../org/apache/cassandra/cql3/QueryHandler.java | 3 +- .../org/apache/cassandra/cql3/QueryOptions.java | 37 +++++--- .../apache/cassandra/cql3/QueryProcessor.java | 24 ++---- .../cassandra/cql3/statements/CFStatement.java | 8 +- .../cql3/statements/ParsedStatement.java | 2 +- .../apache/cassandra/service/ClientState.java | 24 ++++++ .../org/apache/cassandra/transport/Client.java | 4 +- .../cassandra/transport/SimpleClient.java | 2 +- .../transport/messages/BatchMessage.java | 3 +- .../transport/messages/PrepareMessage.java | 43 +++++++++- .../org/apache/cassandra/cql3/CQLTester.java | 2 +- .../cassandra/transport/MessagePayloadTest.java | 89 +++++++++++++++++++- .../cassandra/transport/SerDeserTest.java | 32 +++++-- 17 files changed, 249 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4f3cb3b..dd33fcf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661) * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354) * Skip building views during base table streams on range movements (CASSANDRA-13065) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/doc/native_protocol_v5.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v5.spec b/doc/native_protocol_v5.spec index ac3373c..320f6c0 100644 --- a/doc/native_protocol_v5.spec +++ b/doc/native_protocol_v5.spec @@ -332,7 +332,7 @@ Table of Contents <query><query_parameters> where <query> is a [long string] representing the query and <query_parameters> must be - <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>] + <consistency><flags>[<n>[name_1]<value_1>...[name_n]<value_n>][<result_page_size>][<paging_state>][<serial_consistency>][<timestamp>][<keyspace>] where: - <consistency> is the [consistency] level for the operation. - <flags> is a [int] whose bits define the options for this query and @@ -375,6 +375,9 @@ Table of Contents since the names for the expected values was returned during preparation, a client can always provide values in the right order without any names and using this flag, while supported, is almost surely inefficient. + 0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a + [string] indicating the keyspace that the query should be executed in. + It supercedes the keyspace that the connection is bound to, if any. Note that the consistency is ignored by some queries (USE, CREATE, ALTER, TRUNCATE, ...). @@ -385,8 +388,17 @@ Table of Contents 4.1.5. PREPARE - Prepare a query for later execution (through EXECUTE). The body consists of - the CQL query to prepare as a [long string]. + Prepare a query for later execution (through EXECUTE). The body of the message must be: + <query><flags>[<keyspace>] + where: + - <query> is a [long string] representing the CQL query. + - <flags> is a [int] whose bits define the options for this statement and in particular + influence what the remainder of the message contains. + A flag is set if the bit corresponding to its `mask` is set. Supported + flags are, given their mask: + 0x01: With keyspace. If set, <keyspace> must be present. <keyspace> is a + [string] indicating the keyspace that the query should be executed in. + It supercedes the keyspace that the connection is bound to, if any. The server will respond with a RESULT message with a `prepared` kind (0x0004, see Section 4.2.5). @@ -408,7 +420,7 @@ Table of Contents Allows executing a list of queries (prepared or not) as a batch (note that only DML statements are accepted in a batch). The body of the message must be: - <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>] + <type><n><query_1>...<query_n><consistency><flags>[<serial_consistency>][<timestamp>][<keyspace>] where: - <type> is a [byte] indicating the type of batch to use: - If <type> == 0, the batch will be "logged". This is equivalent to a @@ -440,6 +452,9 @@ Table of Contents to implement. This will be fixed in a future version of the native protocol. See https://issues.apache.org/jira/browse/CASSANDRA-10246 for more details]. + 0x80: With keyspace. If set, <keyspace> must be present. <keyspace> is a + [string] indicating the keyspace that the query should be executed in. + It supercedes the keyspace that the connection is bound to, if any. - <n> is a [short] indicating the number of following queries. - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the form: @@ -1211,3 +1226,5 @@ Table of Contents * Enlarged flag's bitmaps for QUERY, EXECUTE and BATCH messages from [byte] to [int] (Sections 4.1.4, 4.1.6 and 4.1.7). * Add the duration data type + * Added keyspace field in QUERY, PREPARE, and BATCH messages (Sections 4.1.4, 4.1.5, and 4.1.7). + * Added [int] flags field in PREPARE message (Section 4.1.5). http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java index db7fa39..3d3cda0 100644 --- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java @@ -62,6 +62,11 @@ public abstract class BatchQueryOptions return wrapped.getConsistency(); } + public String getKeyspace() + { + return wrapped.getKeyspace(); + } + public ConsistencyLevel getSerialConsistency() { return wrapped.getSerialConsistency(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java index aa8ca48..32cddba 100644 --- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.MD5Digest; @@ -46,9 +47,9 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler return result; } - public ResultMessage.Prepared prepare(String query, QueryState state, Map<String, ByteBuffer> customPayload) + public ResultMessage.Prepared prepare(String query, ClientState clientState, Map<String, ByteBuffer> customPayload) { - ResultMessage.Prepared prepared = queryProcessor.prepare(query, state, customPayload); + ResultMessage.Prepared prepared = queryProcessor.prepare(query, clientState, customPayload); prepared.setCustomPayload(customPayload); return prepared; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java index 0339d26..d3b41f0 100644 --- a/src/java/org/apache/cassandra/cql3/QueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.MD5Digest; @@ -37,7 +38,7 @@ public interface QueryHandler long queryStartNanoTime) throws RequestExecutionException, RequestValidationException; ResultMessage.Prepared prepare(String query, - QueryState state, + ClientState clientState, Map<String, ByteBuffer> customPayload) throws RequestValidationException; ParsedStatement.Prepared getPrepared(MD5Digest id); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index afe20d7..01df691 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -67,9 +67,9 @@ public abstract class QueryOptions return new DefaultQueryOptions(null, null, true, null, protocolVersion); } - public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version) + public static QueryOptions create(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, int pageSize, PagingState pagingState, ConsistencyLevel serialConsistency, ProtocolVersion version, String keyspace) { - return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L), version); + return new DefaultQueryOptions(consistency, values, skipMetadata, new SpecificOptions(pageSize, pagingState, serialConsistency, -1L, keyspace), version); } public static QueryOptions addColumnSpecifications(QueryOptions options, List<ColumnSpecification> columnSpecs) @@ -86,11 +86,11 @@ public abstract class QueryOptions * * This is functionally equivalent to: * {@code Json.parseJson(UTF8Type.instance.getSerializer().deserialize(getValues().get(bindIndex)), expectedReceivers).get(columnName)} - * but this cache the result of parsing the JSON so that while this might be called for multiple columns on the same {@code bindIndex} + * but this caches the result of parsing the JSON, so that while this might be called for multiple columns on the same {@code bindIndex} * value, the underlying JSON value is only parsed/processed once. * - * Note: this is a bit more involved in CQL specifics than this class generally is but we as we need to cache this per-query and in an object - * that is available when we bind values, this is the easier place to have this. + * Note: this is a bit more involved in CQL specifics than this class generally is, but as we need to cache this per-query and in an object + * that is available when we bind values, this is the easiest place to have this. * * @param bindIndex the index of the bind value that should be interpreted as a JSON value. * @param columnName the name of the column we want the value of. @@ -136,7 +136,7 @@ public abstract class QueryOptions * * <p>The column specifications will be present only for prepared statements.</p> * - * <p>Invoke the {@link hasColumnSpecifications} method before invoking this method in order to ensure that this + * <p>Invoke the {@link #hasColumnSpecifications} method before invoking this method in order to ensure that this * <code>QueryOptions</code> contains the column specifications.</p> * * @return the option names @@ -172,6 +172,9 @@ public abstract class QueryOptions return tstamp != Long.MIN_VALUE ? tstamp : state.getTimestamp(); } + /** The keyspace that this query is bound to, or null if not relevant. */ + public String getKeyspace() { return getSpecificOptions().keyspace; } + /** * The protocol version for the query. */ @@ -314,7 +317,7 @@ public abstract class QueryOptions { super.prepare(specs); - orderedValues = new ArrayList<ByteBuffer>(specs.size()); + orderedValues = new ArrayList<>(specs.size()); for (int i = 0; i < specs.size(); i++) { String name = specs.get(i).name.toString(); @@ -341,19 +344,21 @@ public abstract class QueryOptions // Options that are likely to not be present in most queries static class SpecificOptions { - private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE); + private static final SpecificOptions DEFAULT = new SpecificOptions(-1, null, null, Long.MIN_VALUE, null); private final int pageSize; private final PagingState state; private final ConsistencyLevel serialConsistency; private final long timestamp; + private final String keyspace; - private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp) + private SpecificOptions(int pageSize, PagingState state, ConsistencyLevel serialConsistency, long timestamp, String keyspace) { this.pageSize = pageSize; this.state = state; this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency; this.timestamp = timestamp; + this.keyspace = keyspace; } } @@ -368,7 +373,8 @@ public abstract class QueryOptions PAGING_STATE, SERIAL_CONSISTENCY, TIMESTAMP, - NAMES_FOR_VALUES; + NAMES_FOR_VALUES, + KEYSPACE; private static final Flag[] ALL_VALUES = values(); @@ -433,8 +439,8 @@ public abstract class QueryOptions throw new ProtocolException(String.format("Out of bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1, Long.MAX_VALUE, ts)); timestamp = ts; } - - options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp); + String keyspace = flags.contains(Flag.KEYSPACE) ? CBUtil.readString(body) : null; + options = new SpecificOptions(pageSize, pagingState, serialConsistency, timestamp, keyspace); } DefaultQueryOptions opts = new DefaultQueryOptions(consistency, values, skipMetadata, options, version); return names == null ? opts : new OptionsWithNames(opts, names); @@ -460,6 +466,8 @@ public abstract class QueryOptions CBUtil.writeConsistencyLevel(options.getSerialConsistency(), dest); if (flags.contains(Flag.TIMESTAMP)) dest.writeLong(options.getSpecificOptions().timestamp); + if (flags.contains(Flag.KEYSPACE)) + CBUtil.writeString(options.getSpecificOptions().keyspace, dest); // Note that we don't really have to bother with NAMES_FOR_VALUES server side, // and in fact we never really encode QueryOptions, only decode them, so we @@ -485,7 +493,8 @@ public abstract class QueryOptions size += CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency()); if (flags.contains(Flag.TIMESTAMP)) size += 8; - + if (flags.contains(Flag.KEYSPACE)) + size += CBUtil.sizeOfString(options.getSpecificOptions().keyspace); return size; } @@ -504,6 +513,8 @@ public abstract class QueryOptions flags.add(Flag.SERIAL_CONSISTENCY); if (options.getSpecificOptions().timestamp != Long.MIN_VALUE) flags.add(Flag.TIMESTAMP); + if (options.getSpecificOptions().keyspace != null) + flags.add(Flag.KEYSPACE); return flags; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 4aa2026..cca93ff 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -219,7 +219,7 @@ public class QueryProcessor implements QueryHandler public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState()); + ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace())); options.prepare(p.boundNames); CQLStatement prepared = p.statement; if (prepared.getBoundTerms() != options.getValues().size()) @@ -231,9 +231,9 @@ public class QueryProcessor implements QueryHandler return processStatement(prepared, queryState, options, queryStartNanoTime); } - public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException + public static ParsedStatement.Prepared parseStatement(String queryStr, ClientState clientState) throws RequestValidationException { - return getStatement(queryStr, queryState.getClientState()); + return getStatement(queryStr, clientState); } public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException @@ -277,7 +277,7 @@ public class QueryProcessor implements QueryHandler return prepared; // Note: if 2 threads prepare the same query, we'll live so don't bother synchronizing - prepared = parseStatement(query, internalQueryState()); + prepared = parseStatement(query, internalQueryState().getClientState()); prepared.statement.validate(internalQueryState().getClientState()); internalStatements.putIfAbsent(query, prepared); return prepared; @@ -334,7 +334,7 @@ public class QueryProcessor implements QueryHandler */ public static UntypedResultSet executeOnceInternal(String query, Object... values) { - ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState()); + ParsedStatement.Prepared prepared = parseStatement(query, internalQueryState().getClientState()); prepared.statement.validate(internalQueryState().getClientState()); ResultMessage result = prepared.statement.executeInternal(internalQueryState(), makeInternalOptions(prepared, values)); if (result instanceof ResultMessage.Rows) @@ -374,16 +374,10 @@ public class QueryProcessor implements QueryHandler } public ResultMessage.Prepared prepare(String query, - QueryState state, + ClientState clientState, Map<String, ByteBuffer> customPayload) throws RequestValidationException { - return prepare(query, state); - } - - public ResultMessage.Prepared prepare(String queryString, QueryState queryState) - { - ClientState cState = queryState.getClientState(); - return prepare(queryString, cState); + return prepare(query, clientState); } public static ResultMessage.Prepared prepare(String queryString, ClientState clientState) @@ -485,7 +479,7 @@ public class QueryProcessor implements QueryHandler public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { - ClientState clientState = queryState.getClientState(); + ClientState clientState = queryState.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace()); batch.checkAccess(clientState); batch.validate(); batch.validate(clientState); @@ -500,7 +494,7 @@ public class QueryProcessor implements QueryHandler // Set keyspace for statement that require login if (statement instanceof CFStatement) - ((CFStatement)statement).prepareKeyspace(clientState); + ((CFStatement) statement).prepareKeyspace(clientState); Tracing.trace("Preparing statement"); return statement.prepare(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/CFStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java index 9b2987c..136860e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CFStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CFStatement.java @@ -37,14 +37,16 @@ public abstract class CFStatement extends ParsedStatement { if (!cfName.hasKeyspace()) { - // XXX: We explicitely only want to call state.getKeyspace() in this case, as we don't want to throw - // if not logged in any keyspace but a keyspace is explicitely set on the statement. So don't move + // XXX: We explicitly only want to call state.getKeyspace() in this case, as we don't want to throw + // if not logged in any keyspace but a keyspace is explicitly set on the statement. So don't move // the call outside the 'if' or replace the method by 'prepareKeyspace(state.getKeyspace())' cfName.setKeyspace(state.getKeyspace(), true); } } - // Only for internal calls, use the version with ClientState for user queries + // Only for internal calls, use the version with ClientState for user queries. In particular, the + // version with ClientState throws an exception if the statement does not have keyspace set *and* + // ClientState has no keyspace. public void prepareKeyspace(String keyspace) { if (!cfName.hasKeyspace()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java index 0cd549a..e617ba7 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java @@ -51,7 +51,7 @@ public abstract class ParsedStatement /** * Contains the CQL statement source if the statement has been "regularly" perpared via * {@link org.apache.cassandra.cql3.QueryProcessor#prepare(java.lang.String, org.apache.cassandra.service.ClientState)} / - * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.QueryState, java.util.Map)}. + * {@link QueryHandler#prepare(java.lang.String, org.apache.cassandra.service.ClientState, java.util.Map)}. * Other usages of this class may or may not contain the CQL statement source. */ public String rawCQLStatement; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 80cf810..dfddccd 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -139,6 +139,14 @@ public class ClientState this.user = AuthenticatedUser.ANONYMOUS_USER; } + protected ClientState(ClientState source) + { + this.isInternal = source.isInternal; + this.remoteAddress = source.remoteAddress; + this.user = source.user; + this.keyspace = source.keyspace; + } + /** * @return a ClientState object for internal C* calls (not limited by any kind of auth). */ @@ -156,6 +164,22 @@ public class ClientState } /** + * Clone this ClientState object, but use the provided keyspace instead of the + * keyspace in this ClientState object. + * + * @return a new ClientState object if the keyspace argument is non-null. Otherwise do not clone + * and return this ClientState object. + */ + public ClientState cloneWithKeyspaceIfSet(String keyspace) + { + if (keyspace == null) + return this; + ClientState clientState = new ClientState(this); + clientState.setKeyspace(keyspace); + return clientState; + } + + /** * This clock guarantees that updates for the same ClientState will be ordered * in the sequence seen, even if multiple updates happen in the same millisecond. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index e428b06..9a76e03 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -136,12 +136,12 @@ public class Client extends SimpleClient return null; } } - return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version)); + return new QueryMessage(query, QueryOptions.create(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null, version, null)); } else if (msgType.equals("PREPARE")) { String query = line.substring(8); - return new PrepareMessage(query); + return new PrepareMessage(query, null); } else if (msgType.equals("EXECUTE")) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 1bb081b..13cd9bd 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -189,7 +189,7 @@ public class SimpleClient implements Closeable public ResultMessage.Prepared prepare(String query) { - Message.Response msg = execute(new PrepareMessage(query)); + Message.Response msg = execute(new PrepareMessage(query, null)); assert msg instanceof ResultMessage.Prepared; return (ResultMessage.Prepared)msg; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index bb6411f..d9123d4 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -180,7 +180,8 @@ public class BatchMessage extends Message.Request ParsedStatement.Prepared p; if (query instanceof String) { - p = QueryProcessor.parseStatement((String)query, state); + p = QueryProcessor.parseStatement((String)query, + state.getClientState().cloneWithKeyspaceIfSet(options.getKeyspace())); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index b0c9dbe..bb0fc3a 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -36,26 +36,59 @@ public class PrepareMessage extends Message.Request public PrepareMessage decode(ByteBuf body, ProtocolVersion version) { String query = CBUtil.readLongString(body); - return new PrepareMessage(query); + String keyspace = null; + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) { + // If flags grows, we may want to consider creating a PrepareOptions class with an internal codec + // class that handles flags and options of the prepare message. Since there's only one right now, + // we just take care of business here. + + int flags = (int)body.readUnsignedInt(); + if ((flags & 0x1) == 0x1) + keyspace = CBUtil.readString(body); + } + return new PrepareMessage(query, keyspace); } public void encode(PrepareMessage msg, ByteBuf dest, ProtocolVersion version) { CBUtil.writeLongString(msg.query, dest); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + { + // If we have no keyspace, write out a 0-valued flag field. + if (msg.keyspace == null) + dest.writeInt(0x0); + else { + dest.writeInt(0x1); + CBUtil.writeString(msg.keyspace, dest); + } + } } public int encodedSize(PrepareMessage msg, ProtocolVersion version) { - return CBUtil.sizeOfLongString(msg.query); + int size = CBUtil.sizeOfLongString(msg.query); + if (version.isGreaterOrEqualTo(ProtocolVersion.V5)) + { + // We always emit a flags int + size += 4; + + // If we have a keyspace, we'd write it out. Otherwise, we'd write nothing. + size += msg.keyspace == null + ? 0 + : CBUtil.sizeOfString(msg.keyspace); + } + return size; } }; private final String query; + private final String keyspace; - public PrepareMessage(String query) + public PrepareMessage(String query, String keyspace) { super(Message.Type.PREPARE); this.query = query; + this.keyspace = keyspace; } public Message.Response execute(QueryState state, long queryStartNanoTime) @@ -75,7 +108,9 @@ public class PrepareMessage extends Message.Request Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(), ImmutableMap.of("query", query)); } - Message.Response response = ClientState.getCQLQueryHandler().prepare(query, state, getCustomPayload()); + Message.Response response = ClientState.getCQLQueryHandler().prepare(query, + state.getClientState().cloneWithKeyspaceIfSet(keyspace), + getCustomPayload()); if (tracingId != null) response.setTracingId(tracingId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index c247d96..26437c9 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -710,7 +710,7 @@ public abstract class CQLTester state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME); QueryState queryState = new QueryState(state); - ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState); + ParsedStatement.Prepared prepared = QueryProcessor.parseStatement(query, queryState.getClientState()); prepared.statement.validate(state); QueryOptions options = QueryOptions.forInternalCalls(Collections.<ByteBuffer>emptyList()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index cfbfd39..c27593b 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -48,6 +48,7 @@ import org.apache.cassandra.transport.messages.QueryMessage; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.MD5Digest; +import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public class MessagePayloadTest extends CQLTester @@ -112,6 +113,86 @@ public class MessagePayloadTest extends CQLTester } @Test + public void testMessagePayloadBeta() throws Throwable + { + QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null); + cqlQueryHandlerField.set(null, new TestQueryHandler()); + try + { + requireNetwork(); + + Assert.assertSame(TestQueryHandler.class, ClientState.getCQLQueryHandler().getClass()); + + SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), + nativePort, + ProtocolVersion.V5, + true, + new ClientEncryptionOptions()); + try + { + client.connect(false); + + Map<String, ByteBuffer> reqMap; + Map<String, ByteBuffer> respMap; + + QueryOptions queryOptions = QueryOptions.create( + QueryOptions.DEFAULT.getConsistency(), + QueryOptions.DEFAULT.getValues(), + QueryOptions.DEFAULT.skipMetadata(), + QueryOptions.DEFAULT.getPageSize(), + QueryOptions.DEFAULT.getPagingState(), + QueryOptions.DEFAULT.getSerialConsistency(), + ProtocolVersion.V5, + KEYSPACE); + QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)", + queryOptions); + PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM atable", KEYSPACE); + + reqMap = Collections.singletonMap("foo", bytes(42)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(42)); + queryMessage.setCustomPayload(reqMap); + Message.Response queryResponse = client.execute(queryMessage); + payloadEquals(reqMap, requestPayload); + payloadEquals(respMap, queryResponse.getCustomPayload()); + + reqMap = Collections.singletonMap("foo", bytes(43)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(43)); + prepareMessage.setCustomPayload(reqMap); + ResultMessage.Prepared prepareResponse = (ResultMessage.Prepared) client.execute(prepareMessage); + payloadEquals(reqMap, requestPayload); + payloadEquals(respMap, prepareResponse.getCustomPayload()); + + ExecuteMessage executeMessage = new ExecuteMessage(prepareResponse.statementId, QueryOptions.DEFAULT); + reqMap = Collections.singletonMap("foo", bytes(44)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(44)); + executeMessage.setCustomPayload(reqMap); + Message.Response executeResponse = client.execute(executeMessage); + payloadEquals(reqMap, requestPayload); + payloadEquals(respMap, executeResponse.getCustomPayload()); + + BatchMessage batchMessage = new BatchMessage(BatchStatement.Type.UNLOGGED, + Collections.<Object>singletonList("INSERT INTO atable (pk,v) VALUES (1, 'foo')"), + Collections.singletonList(Collections.<ByteBuffer>emptyList()), + queryOptions); + reqMap = Collections.singletonMap("foo", bytes(45)); + responsePayload = respMap = Collections.singletonMap("bar", bytes(45)); + batchMessage.setCustomPayload(reqMap); + Message.Response batchResponse = client.execute(batchMessage); + payloadEquals(reqMap, requestPayload); + payloadEquals(respMap, batchResponse.getCustomPayload()); + } + finally + { + client.close(); + } + } + finally + { + cqlQueryHandlerField.set(null, queryHandler); + } + } + + @Test public void testMessagePayload() throws Throwable { QueryHandler queryHandler = (QueryHandler) cqlQueryHandlerField.get(null); @@ -134,7 +215,7 @@ public class MessagePayloadTest extends CQLTester "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", QueryOptions.DEFAULT ); - PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable"); + PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null); reqMap = Collections.singletonMap("foo", bytes(42)); responsePayload = respMap = Collections.singletonMap("bar", bytes(42)); @@ -202,7 +283,7 @@ public class MessagePayloadTest extends CQLTester "CREATE TABLE " + KEYSPACE + ".atable (pk int PRIMARY KEY, v text)", QueryOptions.DEFAULT ); - PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable"); + PrepareMessage prepareMessage = new PrepareMessage("SELECT * FROM " + KEYSPACE + ".atable", null); reqMap = Collections.singletonMap("foo", bytes(42)); responsePayload = Collections.singletonMap("bar", bytes(42)); @@ -293,13 +374,13 @@ public class MessagePayloadTest extends CQLTester } public ResultMessage.Prepared prepare(String query, - QueryState state, + ClientState clientState, Map<String, ByteBuffer> customPayload) throws RequestValidationException { if (customPayload != null) requestPayload = customPayload; - ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, state, customPayload); + ResultMessage.Prepared result = QueryProcessor.instance.prepare(query, clientState, customPayload); if (customPayload != null) { result.setCustomPayload(responsePayload); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1f533260/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index a1260ba..2592ae7 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -314,15 +314,30 @@ public class SerDeserTest private void queryOptionsSerDeserTest(ProtocolVersion version) throws Exception { - QueryOptions options = QueryOptions.create(ConsistencyLevel.ALL, - Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })), - false, - 5000, - Util.makeSomePagingState(version), - ConsistencyLevel.SERIAL, - version - ); + queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.ALL, + Collections.singletonList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 })), + false, + 5000, + Util.makeSomePagingState(version), + ConsistencyLevel.SERIAL, + version, + null + )); + + queryOptionsSerDeserTest(version, QueryOptions.create(ConsistencyLevel.LOCAL_ONE, + Arrays.asList(ByteBuffer.wrap(new byte[] { 0x00, 0x01, 0x02 }), + ByteBuffer.wrap(new byte[] { 0x03, 0x04, 0x05, 0x03, 0x04, 0x05 })), + true, + 10, + Util.makeSomePagingState(version), + ConsistencyLevel.SERIAL, + version, + "some_keyspace" + )); + } + private void queryOptionsSerDeserTest(ProtocolVersion version, QueryOptions options) + { ByteBuf buf = Unpooled.buffer(QueryOptions.codec.encodedSize(options, version)); QueryOptions.codec.encode(options, buf, version); QueryOptions decodedOptions = QueryOptions.codec.decode(buf, version); @@ -335,5 +350,6 @@ public class SerDeserTest assertEquals(options.getValues(), decodedOptions.getValues()); assertEquals(options.getPagingState(), decodedOptions.getPagingState()); assertEquals(options.skipMetadata(), decodedOptions.skipMetadata()); + assertEquals(options.getKeyspace(), decodedOptions.getKeyspace()); } }
