Updated Branches: refs/heads/trunk 5eb530853 -> ab13579a3
Improve CQL3 batchlog support patch by slebresne; reviewed by jbellis for CASSANDRA-4738 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ab13579a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ab13579a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ab13579a Branch: refs/heads/trunk Commit: ab13579a3cfde211437987a889b25f8ae6d94725 Parents: 5eb5308 Author: Sylvain Lebresne <[email protected]> Authored: Tue Oct 2 18:37:35 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Oct 2 18:37:35 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../cassandra/cql3/statements/BatchStatement.java | 28 ------------- .../cassandra/cql3/statements/DeleteStatement.java | 2 + .../cql3/statements/ModificationStatement.java | 31 ++++++++++++++- .../cassandra/cql3/statements/UpdateStatement.java | 21 +++++++--- 5 files changed, 47 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b33e40c..dd6061f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -14,7 +14,7 @@ * (CQL3) Fix validation when using counter and regular columns in the same table (CASSANDRA-4706) * Fix bug starting Cassandra with simple authentication (CASSANDRA-4648) - * Add support for batchlog in CQL3 (CASSANDRA-4545) + * Add support for batchlog in CQL3 (CASSANDRA-4545, 4738) * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) * Support repairing only the local DC nodes (CASSANDRA-4747) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 38df9bd..92c708b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -38,12 +38,6 @@ import org.apache.cassandra.utils.Pair; */ public class BatchStatement extends ModificationStatement { - public static enum Type - { - LOGGED, UNLOGGED, COUNTER - } - - protected final Type type; // statements to execute protected final List<ModificationStatement> statements; @@ -85,28 +79,6 @@ public class BatchStatement extends ModificationStatement } @Override - public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException - { - Collection<? extends IMutation> mutations = getMutations(state, variables, false); - ConsistencyLevel cl = getConsistencyLevel(); - - switch (type) - { - case LOGGED: - StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl); - break; - case UNLOGGED: - case COUNTER: - StorageProxy.mutate(mutations, cl); - break; - default: - throw new AssertionError(); - } - - return null; - } - - @Override public ConsistencyLevel getConsistencyLevel() { // We have validated that either the consistency is set, or all statements have the same default CL (see validate()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index fc0efb8..86af858 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -195,6 +195,8 @@ public class DeleteStatement extends ModificationStatement public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException { CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); + type = metadata.getDefaultValidator().isCommutative() ? Type.COUNTER : Type.LOGGED; + cfDef = metadata.getCfDef(); UpdateStatement.processKeys(cfDef, whereClause, processedKeys, boundNames); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 192d837..df58ca9 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -41,6 +41,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt { public static final ConsistencyLevel defaultConsistency = ConsistencyLevel.ONE; + public static enum Type + { + LOGGED, UNLOGGED, COUNTER + } + + protected Type type; + private final ConsistencyLevel cLevel; private Long timestamp; private final int timeToLive; @@ -73,10 +80,32 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException { - StorageProxy.mutate(getMutations(state, variables, false), getConsistencyLevel()); + Collection<? extends IMutation> mutations = getMutations(state, variables, false); + ConsistencyLevel cl = getConsistencyLevel(); + + // The type should have been set by now or we have a bug + assert type != null; + + switch (type) + { + case LOGGED: + if (mutations.size() > 1) + StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl); + else + StorageProxy.mutate(mutations, cl); + break; + case UNLOGGED: + case COUNTER: + StorageProxy.mutate(mutations, cl); + break; + default: + throw new AssertionError(); + } + return null; } + public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException { for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ab13579a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index a4a310d..cb4261f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -285,23 +285,30 @@ public class UpdateStatement extends ModificationStatement public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException { - boolean hasCommutativeOperation = false; - if (columns != null) { for (Pair<ColumnIdentifier, Operation> column : columns) { if (column.right.getType() == Operation.Type.COUNTER) - hasCommutativeOperation = true; - - if (hasCommutativeOperation && column.right.getType() != Operation.Type.COUNTER) + { + if (type == null) + type = Type.COUNTER; + else if (type != Type.COUNTER) + throw new InvalidRequestException("Mix of counter and non-counter operations is not allowed."); + } + else if (type == Type.COUNTER) + { throw new InvalidRequestException("Mix of counter and non-counter operations is not allowed."); + } } } + if (type == null) + type = Type.LOGGED; + // Deal here with the keyspace overwrite thingy to avoid mistake - CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), hasCommutativeOperation); - if (hasCommutativeOperation) + CFMetaData metadata = validateColumnFamily(keyspace(), columnFamily(), type == Type.COUNTER); + if (type == Type.COUNTER) getConsistencyLevel().validateCounterForWrite(metadata); cfDef = metadata.getCfDef();
