Merge branch 'cassandra-1.2' into cassandra-2.0 Conflicts: src/java/org/apache/cassandra/cql3/QueryProcessor.java src/java/org/apache/cassandra/cql3/statements/BatchStatement.java src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java src/java/org/apache/cassandra/cql3/statements/SelectStatement.java src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1435ffd Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1435ffd Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1435ffd Branch: refs/heads/trunk Commit: b1435ffd1dcdeab9f0ebc52cf1ed3cddafa13c4e Parents: 4d36bbf b97c523 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Dec 20 20:14:07 2013 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Dec 20 20:14:07 2013 +0300 ---------------------------------------------------------------------- .../org/apache/cassandra/cql3/CQLStatement.java | 2 +- .../apache/cassandra/cql3/QueryProcessor.java | 20 ++++++++++---------- .../statements/AuthenticationStatement.java | 2 +- .../cql3/statements/AuthorizationStatement.java | 2 +- .../cql3/statements/BatchStatement.java | 4 ++-- .../cql3/statements/ModificationStatement.java | 4 ++-- .../cql3/statements/ParsedStatement.java | 2 +- .../statements/SchemaAlteringStatement.java | 2 +- .../cql3/statements/SelectStatement.java | 4 ++-- .../cql3/statements/TruncateStatement.java | 2 +- .../cassandra/cql3/statements/UseStatement.java | 2 +- .../cassandra/thrift/CassandraServer.java | 2 +- .../transport/messages/BatchMessage.java | 5 ++--- 13 files changed, 26 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/CQLStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java index ad3c4b4,332aea1..02361a8 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@@ -209,22 -152,10 +209,22 @@@ public class QueryProcesso public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState) throws RequestExecutionException, RequestValidationException { + return process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); + } + + public static ResultMessage process(String queryString, QueryState queryState, QueryOptions options) + throws RequestExecutionException, RequestValidationException + { CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement; - if (prepared.getBoundsTerms() != options.getValues().size()) - if (prepared.getBoundTerms() > 0) - throw new InvalidRequestException("Cannot execute query with bind variables"); - return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList()); ++ if (prepared.getBoundTerms() != options.getValues().size()) + throw new InvalidRequestException("Invalid amount of bind variables"); + + return processStatement(prepared, queryState, options, queryString); + } + + public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException + { + return getStatement(queryStr, queryState.getClientState()).statement; } public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException @@@ -286,20 -218,12 +286,20 @@@ throws RequestValidationException { ParsedStatement.Prepared prepared = getStatement(queryString, clientState); - int bountTerms = prepared.statement.getBoundsTerms(); - if (bountTerms > FBUtilities.MAX_UNSIGNED_SHORT) - throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", bountTerms, FBUtilities.MAX_UNSIGNED_SHORT)); - assert bountTerms == prepared.boundNames.size(); ++ int boundTerms = prepared.statement.getBoundTerms(); ++ if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT) ++ throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT)); ++ assert boundTerms == prepared.boundNames.size(); + ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift); - int bountTerms = prepared.statement.getBoundTerms(); - if (bountTerms > FBUtilities.MAX_UNSIGNED_SHORT) - throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", bountTerms, FBUtilities.MAX_UNSIGNED_SHORT)); - assert bountTerms == prepared.boundNames.size(); + if (!postPreparationHooks.isEmpty()) + { + PreparationContext context = new PreparationContext(clientState, queryString, prepared.boundNames); + for (PostPreparationHook hook : postPreparationHooks) + hook.processStatement(prepared.statement, context); + } + return msg; } @@@ -330,22 -254,21 +330,22 @@@ MD5Digest statementId = MD5Digest.compute(toHash); preparedStatements.put(statementId, prepared.statement); logger.trace(String.format("Stored prepared statement %s with %d bind markers", - statementId, - prepared.statement.getBoundTerms())); - return new ResultMessage.Prepared(statementId, prepared.boundNames); + statementId, - prepared.statement.getBoundsTerms())); ++ prepared.statement.getBoundTerms())); + return new ResultMessage.Prepared(statementId, prepared); } } - public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) + public static ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { + List<ByteBuffer> variables = options.getValues(); // Check to see if there are any bound variables to verify - if (!(variables.isEmpty() && (statement.getBoundsTerms() == 0))) + if (!(variables.isEmpty() && (statement.getBoundTerms() == 0))) { - if (variables.size() != statement.getBoundsTerms()) + if (variables.size() != statement.getBoundTerms()) throw new InvalidRequestException(String.format("there were %d markers(?) in CQL but %d bound variables", - statement.getBoundsTerms(), + statement.getBoundTerms(), variables.size())); // at this point there is a match in count between markers and variables that is non-zero http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 311a3c7,05dae48..25f61fb --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@@ -72,11 -64,14 +72,11 @@@ public class BatchStatement implements return size; } - public int getBoundsTerms() - @Override - public void prepareKeyspace(ClientState state) throws InvalidRequestException ++ public int getBoundTerms() { - for (ModificationStatement statement : statements) - statement.prepareKeyspace(state); + return boundTerms; } - @Override public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException { for (ModificationStatement statement : statements) @@@ -182,52 -147,4 +182,52 @@@ { return String.format("BatchStatement(type=%s, statements=%s)", type, statements); } + + public static class Parsed extends CFStatement + { + private final Type type; + private final Attributes.Raw attrs; + private final List<ModificationStatement.Parsed> parsedStatements; + + public Parsed(Type type, Attributes.Raw attrs, List<ModificationStatement.Parsed> parsedStatements) + { + super(null); + this.type = type; + this.attrs = attrs; + this.parsedStatements = parsedStatements; + } + + @Override + public void prepareKeyspace(ClientState state) throws InvalidRequestException + { + for (ModificationStatement.Parsed statement : parsedStatements) + statement.prepareKeyspace(state); + } + + public ParsedStatement.Prepared prepare() throws InvalidRequestException + { - VariableSpecifications boundNames = getBoundsVariables(); ++ VariableSpecifications boundNames = getBoundVariables(); + + List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size()); + for (ModificationStatement.Parsed parsed : parsedStatements) + { + ModificationStatement stmt = parsed.prepare(boundNames); + if (stmt.hasConditions()) + throw new InvalidRequestException("Conditional updates are not allowed in batches"); + + if (stmt.isCounter() && type != Type.COUNTER) + throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches"); + + if (!stmt.isCounter() && type == Type.COUNTER) + throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches"); + + statements.add(stmt); + } + + Attributes prepAttrs = attrs.prepare("[batch]", "[batch]"); + prepAttrs.collectMarkerSpecification(boundNames); + + return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type, statements, prepAttrs), boundNames); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 8833f34,bfbf511..9e0fd62 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@@ -36,75 -31,35 +36,75 @@@ import org.apache.cassandra.exceptions. import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.ByteBufferUtil; -/** - * Abstract class for statements that apply on a given column family. +/* + * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE. */ -public abstract class ModificationStatement extends CFStatement implements CQLStatement, MeasurableForPreparedCache +public abstract class ModificationStatement implements CQLStatement, MeasurableForPreparedCache { - public static enum Type + private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false); + + public final CFMetaData cfm; + public final Attributes attrs; + + private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>(); + private final List<Operation> columnOperations = new ArrayList<Operation>(); + + private int boundTerms; + private List<Operation> columnConditions; + private boolean ifNotExists; + + public ModificationStatement(CFMetaData cfm, Attributes attrs) { - LOGGED, UNLOGGED, COUNTER + this.cfm = cfm; + this.attrs = attrs; } - protected Type type; + public long measureForPreparedCache(MemoryMeter meter) + { + return meter.measureDeep(this) - meter.measureDeep(cfm); + } + + public abstract boolean requireFullClusteringKey(); + public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException; - public int getBoundsTerms() - private Long timestamp; - private final int timeToLive; ++ public int getBoundTerms() + { + return boundTerms; + } - public ModificationStatement(CFName name, Attributes attrs) + public String keyspace() { - this(name, attrs.timestamp, attrs.timeToLive); + return cfm.ksName; } - public ModificationStatement(CFName name, Long timestamp, int timeToLive) + public String columnFamily() { - super(name); - this.timestamp = timestamp; - this.timeToLive = timeToLive; + return cfm.cfName; + } + + public boolean isCounter() + { + return cfm.getDefaultValidator().isCommutative(); + } + + public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException + { + return attrs.getTimestamp(now, variables); + } + + public boolean isTimestampSet() + { + return attrs.isTimestampSet(); + } + + public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException + { + return attrs.getTimeToLive(variables); } public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException @@@ -502,156 -199,8 +502,156 @@@ * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) - throws RequestExecutionException, RequestValidationException; + public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) + throws RequestExecutionException, RequestValidationException + { + List<ByteBuffer> keys = buildPartitionKeyNames(variables); + ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables); + + // Some lists operation requires reading + Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl); + UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); - public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException; + Collection<IMutation> mutations = new ArrayList<IMutation>(); + for (ByteBuffer key: keys) + { + ThriftValidation.validateKey(cfm, key); + ColumnFamily cf = updateForKey(key, clusteringPrefix, params); + mutations.add(makeMutation(key, cf, cl, isBatch)); + } + return mutations; + } + + private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch) + { + RowMutation rm; + if (isBatch) + { + // we might group other mutations together with this one later, so make it mutable + rm = new RowMutation(cfm.ksName, key); + rm.add(cf); + } + else + { + rm = new RowMutation(cfm.ksName, key, cf); + } + return isCounter() ? new CounterMutation(rm, cl) : rm; + } + + private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params) + throws InvalidRequestException + { + if (ifNotExists) + return null; + + ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm); + + // CQL row marker + CFDefinition cfDef = cfm.getCfDef(); + if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper()) + { + ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build(); + cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER)); + } + + // Conditions + for (Operation condition : columnConditions) + condition.execute(key, cf, clusteringPrefix.copy(), params); + + assert !cf.isEmpty(); + return cf; + } + + public static abstract class Parsed extends CFStatement + { + protected final Attributes.Raw attrs; + private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions; + private final boolean ifNotExists; + + protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists) + { + super(name); + this.attrs = attrs; + this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions; + this.ifNotExists = ifNotExists; + } + + public ParsedStatement.Prepared prepare() throws InvalidRequestException + { - VariableSpecifications boundNames = getBoundsVariables(); ++ VariableSpecifications boundNames = getBoundVariables(); + ModificationStatement statement = prepare(boundNames); + return new ParsedStatement.Prepared(statement, boundNames); + } + + public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException + { + CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); + CFDefinition cfDef = metadata.getCfDef(); + + // The collected count in the beginning of preparation. + // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones). + int collected = boundNames.getCollectedCount(); + + Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily()); + preparedAttributes.collectMarkerSpecification(boundNames); + + ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes); + + if (ifNotExists || (conditions != null && !conditions.isEmpty())) + { + if (stmt.isCounter()) + throw new InvalidRequestException("Conditional updates are not supported on counter tables"); + + if (attrs.timestamp != null) + throw new InvalidRequestException("Cannot provide custom timestamp for conditional update"); + + if (ifNotExists) + { + // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense. + // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes. + assert conditions.isEmpty(); + stmt.setIfNotExistCondition(); + } + else + { + for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions) + { + CFDefinition.Name name = cfDef.get(entry.left); + if (name == null) + throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left)); + + /* + * Lists column names are based on a server-side generated timeuuid. So we can't allow lists + * operation or that would yield unexpected results (update that should apply wouldn't). So for + * now, we just refuse lists, which also save use from having to bother about the read that some + * list operation involve. + */ + if (name.type instanceof ListType) + throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name)); + + Operation condition = entry.right.prepare(name); + assert !condition.requiresRead(); + + condition.collectMarkerSpecification(boundNames); + + switch (name.kind) + { + case KEY_ALIAS: + case COLUMN_ALIAS: + throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left)); + case VALUE_ALIAS: + case COLUMN_METADATA: + stmt.addCondition(condition); + break; + } + } + } + } + + stmt.boundTerms = boundNames.getCollectedCount() - collected; + return stmt; + } + + protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java index ecf8a8a,2d0b4c7..d048327 --- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java @@@ -25,11 -25,11 +25,11 @@@ import org.apache.cassandra.exceptions. public abstract class ParsedStatement { - private int boundTerms; + private VariableSpecifications variables; - public VariableSpecifications getBoundsVariables() - public int getBoundTerms() ++ public VariableSpecifications getBoundVariables() { - return boundTerms; + return variables; } // Used by the parser and preparable statement http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java index f2904e4,4d40e99..337e8dc --- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java @@@ -44,11 -47,6 +44,11 @@@ public abstract class SchemaAlteringSta this.isColumnFamilyLevel = true; } - public int getBoundsTerms() ++ public int getBoundTerms() + { + return 0; + } + @Override public void prepareKeyspace(ClientState state) throws InvalidRequestException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index b94e549,4730f18..133444a --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@@ -1067,9 -997,12 +1067,9 @@@ public class SelectStatement implement { CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); - if (parameters.limit <= 0) - throw new InvalidRequestException("LIMIT must be strictly positive"); - CFDefinition cfDef = cfm.getCfDef(); - VariableSpecifications names = getBoundsVariables(); - ColumnSpecification[] names = new ColumnSpecification[getBoundTerms()]; ++ VariableSpecifications names = getBoundVariables(); // Select clause if (parameters.isCount && !selectClause.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index d5baedf,16445f5..30e57d5 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@@ -36,11 -39,6 +36,11 @@@ public class TruncateStatement extends super(name); } - public int getBoundsTerms() ++ public int getBoundTerms() + { + return 0; + } + public Prepared prepare() throws InvalidRequestException { return new Prepared(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/UseStatement.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/UseStatement.java index db2435f,0db80bf..ee70f9d --- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java @@@ -34,11 -37,6 +34,11 @@@ public class UseStatement extends Parse this.keyspace = keyspace; } - public int getBoundsTerms() ++ public int getBoundTerms() + { + return 0; + } + public Prepared prepare() throws InvalidRequestException { return new Prepared(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java index 4e3c372,ec7a37d..07c271b --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@@ -2077,11 -1849,9 +2077,11 @@@ public class CassandraServer implement " (either the query was not prepared on this host (maybe the host has been restarted?)" + " or you have prepared too many queries and it has been evicted from the internal cache)", itemId)); - logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms()); + logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundTerms()); - return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel), cState.getQueryState(), bindVariables).toThriftResult(); + return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, + cState.getQueryState(), + new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); } catch (RequestExecutionException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/transport/messages/BatchMessage.java index bd95ef3,0000000..487e089 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@@ -1,237 -1,0 +1,236 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.jboss.netty.buffer.ChannelBuffer; - import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.cql3.Attributes; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.*; +import org.apache.cassandra.utils.MD5Digest; +import org.apache.cassandra.utils.UUIDGen; + +public class BatchMessage extends Message.Request +{ + public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>() + { + public BatchMessage decode(ChannelBuffer body, int version) + { + if (version == 1) + throw new ProtocolException("BATCH messages are not support in version 1 of the protocol"); + + byte type = body.readByte(); + int n = body.readUnsignedShort(); + List<Object> queryOrIds = new ArrayList<Object>(n); + List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n); + for (int i = 0; i < n; i++) + { + byte kind = body.readByte(); + if (kind == 0) + queryOrIds.add(CBUtil.readLongString(body)); + else if (kind == 1) + queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body))); + else + throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind); + variables.add(CBUtil.readValueList(body)); + } + ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); + return new BatchMessage(toType(type), queryOrIds, variables, consistency); + } + + public void encode(BatchMessage msg, ChannelBuffer dest, int version) + { + int queries = msg.queryOrIdList.size(); + + dest.writeByte(fromType(msg.type)); + dest.writeShort(queries); + + for (int i = 0; i < queries; i++) + { + Object q = msg.queryOrIdList.get(i); + dest.writeByte((byte)(q instanceof String ? 0 : 1)); + if (q instanceof String) + CBUtil.writeLongString((String)q, dest); + else + CBUtil.writeBytes(((MD5Digest)q).bytes, dest); + + CBUtil.writeValueList(msg.values.get(i), dest); + } + + CBUtil.writeConsistencyLevel(msg.consistency, dest); + } + + public int encodedSize(BatchMessage msg, int version) + { + int size = 3; // type + nb queries + for (int i = 0; i < msg.queryOrIdList.size(); i++) + { + Object q = msg.queryOrIdList.get(i); + size += 1 + (q instanceof String + ? CBUtil.sizeOfLongString((String)q) + : CBUtil.sizeOfBytes(((MD5Digest)q).bytes)); + + size += CBUtil.sizeOfValueList(msg.values.get(i)); + } + size += CBUtil.sizeOfConsistencyLevel(msg.consistency); + return size; + } + + private BatchStatement.Type toType(byte b) + { + if (b == 0) + return BatchStatement.Type.LOGGED; + else if (b == 1) + return BatchStatement.Type.UNLOGGED; + else if (b == 2) + return BatchStatement.Type.COUNTER; + else + throw new ProtocolException("Invalid BATCH message type " + b); + } + + private byte fromType(BatchStatement.Type type) + { + switch (type) + { + case LOGGED: return 0; + case UNLOGGED: return 1; + case COUNTER: return 2; + default: + throw new AssertionError(); + } + } + }; + + public final BatchStatement.Type type; + public final List<Object> queryOrIdList; + public final List<List<ByteBuffer>> values; + public final ConsistencyLevel consistency; + + public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency) + { + super(Message.Type.BATCH); + this.type = type; + this.queryOrIdList = queryOrIdList; + this.values = values; + this.consistency = consistency; + } + + public Message.Response execute(QueryState state) + { + try + { + UUID tracingId = null; + if (isTracingRequested()) + { + tracingId = UUIDGen.getTimeUUID(); + state.prepareTracingSession(tracingId); + } + + if (state.traceNextQuery()) + { + state.createTracingSession(); + // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. + Tracing.instance.begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap()); + } + + List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size()); + for (int i = 0; i < queryOrIdList.size(); i++) + { + Object query = queryOrIdList.get(i); + CQLStatement statement; + if (query instanceof String) + { + statement = QueryProcessor.parseStatement((String)query, state); + } + else + { + statement = QueryProcessor.getPrepared((MD5Digest)query); + if (statement == null) + throw new PreparedQueryNotFoundException((MD5Digest)query); + } + + List<ByteBuffer> queryValues = values.get(i); - if (queryValues.size() != statement.getBoundsTerms()) ++ if (queryValues.size() != statement.getBoundTerms()) + throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables", - statement.getBoundsTerms(), ++ statement.getBoundTerms(), + queryValues.size())); + if (!(statement instanceof ModificationStatement)) + throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed."); + + ModificationStatement mst = (ModificationStatement)statement; + if (mst.isCounter()) + { + if (type != BatchStatement.Type.COUNTER) + throw new InvalidRequestException("Cannot include counter statement in a non-counter batch"); + } + else + { + if (type == BatchStatement.Type.COUNTER) + throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); + } + statements.add(mst); + } + + // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor + // (and no value would be really correct, so we prefer passing a clearly wrong one). + BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none()); + Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values, queryOrIdList); + + if (tracingId != null) + response.setTracingId(tracingId); + + return response; + } + catch (Exception e) + { + return ErrorMessage.fromException(e); + } + finally + { + Tracing.instance.stopSession(); + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("BATCH of ["); + for (int i = 0; i < queryOrIdList.size(); i++) + { + if (i > 0) sb.append(", "); + sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values"); + } + sb.append("] at consistency ").append(consistency); + return sb.toString(); + } +}