Updated Branches: refs/heads/trunk 859473db5 -> 5dc8ba837
Add support for batchlog in CQL3 patch by iamaleksey; reviewed by slebresne for CASSANDRA-4545 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5dc8ba83 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5dc8ba83 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5dc8ba83 Branch: refs/heads/trunk Commit: 5dc8ba837a706be7f4f8e180b9bafdb61aa4ccac Parents: 859473d Author: Sylvain Lebresne <[email protected]> Authored: Mon Oct 1 16:47:46 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Mon Oct 1 16:47:46 2012 +0200 ---------------------------------------------------------------------- pylib/cqlshlib/cql3handling.py | 2 +- src/java/org/apache/cassandra/cql3/Cql.g | 12 ++- .../cassandra/cql3/statements/BatchStatement.java | 94 ++++++++------- .../cassandra/cql3/statements/DeleteStatement.java | 6 +- .../cql3/statements/ModificationStatement.java | 4 +- .../cassandra/cql3/statements/UpdateStatement.java | 10 +- .../org/apache/cassandra/service/StorageProxy.java | 14 ++- 7 files changed, 73 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index f3c338e..4fc7714 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -449,7 +449,7 @@ def delete_opt_completer(ctxt, cass): explain_completion('deleteStatement', 'delcol', '<column_to_delete>') syntax_rules += r''' -<batchStatement> ::= "BEGIN" "BATCH" +<batchStatement> ::= "BEGIN" ( "UNLOGGED" | "COUNTER" )? "BATCH" ( "USING" [batchopt]=<usingOption> ( "AND" [batchopt]=<usingOption> )* )? [batchstmt]=<batchStatementMember> ";" http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index 1379b9a..320e934 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -353,14 +353,17 @@ deleteSelector returns [Selector s] */ batchStatement returns [BatchStatement expr] @init { - Attributes attrs = new Attributes(); + BatchStatement.Type type = BatchStatement.Type.LOGGED; List<ModificationStatement> statements = new ArrayList<ModificationStatement>(); + Attributes attrs = new Attributes(); } - : K_BEGIN K_BATCH ( usingClause[attrs] )? + : K_BEGIN + ( K_UNLOGGED { type = BatchStatement.Type.UNLOGGED; } | K_COUNTER { type = BatchStatement.Type.COUNTER; } )? + K_BATCH ( usingClause[attrs] )? s1=batchStatementObjective ';'? { statements.add(s1); } ( sN=batchStatementObjective ';'? { statements.add(sN); } )* K_APPLY K_BATCH { - return new BatchStatement(statements, attrs); + return new BatchStatement(type, statements, attrs); } ; @@ -767,8 +770,9 @@ K_USE: U S E; K_COUNT: C O U N T; K_SET: S E T; K_BEGIN: B E G I N; -K_APPLY: A P P L Y; +K_UNLOGGED: U N L O G G E D; K_BATCH: B A T C H; +K_APPLY: A P P L Y; K_TRUNCATE: T R U N C A T E; K_DELETE: D E L E T E; K_IN: I N; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 9e5cb40..38df9bd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.TimeoutException; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.cql3.*; @@ -29,8 +28,8 @@ import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.thrift.RequestType; -import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.Pair; /** @@ -39,6 +38,12 @@ import org.apache.cassandra.utils.Pair; */ public class BatchStatement extends ModificationStatement { + public static enum Type + { + LOGGED, UNLOGGED, COUNTER + } + + protected final Type type; // statements to execute protected final List<ModificationStatement> statements; @@ -46,12 +51,14 @@ public class BatchStatement extends ModificationStatement * Creates a new BatchStatement from a list of statements and a * Thrift consistency level. * + * @param type type of the batch * @param statements a list of UpdateStatements * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ - public BatchStatement(List<ModificationStatement> statements, Attributes attrs) + public BatchStatement(Type type, List<ModificationStatement> statements, Attributes attrs) { super(null, attrs); + this.type = type; this.statements = statements; } @@ -78,6 +85,28 @@ public class BatchStatement extends ModificationStatement } @Override + public ResultMessage execute(ClientState state, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException + { + Collection<? extends IMutation> mutations = getMutations(state, variables, false); + ConsistencyLevel cl = getConsistencyLevel(); + + switch (type) + { + case LOGGED: + StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl); + break; + case UNLOGGED: + case COUNTER: + StorageProxy.mutate(mutations, cl); + break; + default: + throw new AssertionError(); + } + + return null; + } + + @Override public ConsistencyLevel getConsistencyLevel() { // We have validated that either the consistency is set, or all statements have the same default CL (see validate()) @@ -119,54 +148,35 @@ public class BatchStatement extends ModificationStatement } } - public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) + public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) throws RequestExecutionException, RequestValidationException { - Map<Pair<String, ByteBuffer>, RowAndCounterMutation> mutations = new HashMap<Pair<String, ByteBuffer>, RowAndCounterMutation>(); + Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>(); for (ModificationStatement statement : statements) { if (isSetTimestamp()) statement.setTimestamp(getTimestamp(clientState)); - List<IMutation> lm = statement.getMutations(clientState, variables, local); // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : lm) + for (IMutation m : statement.getMutations(clientState, variables, local)) { + if (m instanceof CounterMutation && type != Type.COUNTER) + throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches"); + + if (m instanceof RowMutation && type == Type.COUNTER) + throw new InvalidRequestException("Only counter mutations are allowed in COUNTER batches"); + Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key()); - RowAndCounterMutation racm = mutations.get(key); - if (racm == null) - { - racm = new RowAndCounterMutation(); - mutations.put(key, racm); - } - - if (m instanceof CounterMutation) - { - if (racm.cm == null) - racm.cm = (CounterMutation)m; - else - racm.cm.addAll(m); - } + IMutation existing = mutations.get(key); + + if (existing == null) + mutations.put(key, m); else - { - assert m instanceof RowMutation; - if (racm.rm == null) - racm.rm = (RowMutation)m; - else - racm.rm.addAll(m); - } + existing.addAll(m); } } - List<IMutation> batch = new LinkedList<IMutation>(); - for (RowAndCounterMutation racm : mutations.values()) - { - if (racm.rm != null) - batch.add(racm.rm); - if (racm.cm != null) - batch.add(racm.cm); - } - return batch; + return mutations.values(); } public ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException @@ -187,12 +197,6 @@ public class BatchStatement extends ModificationStatement public String toString() { - return String.format("BatchStatement(statements=%s, consistency=%s)", statements, getConsistencyLevel()); - } - - private static class RowAndCounterMutation - { - public RowMutation rm; - public CounterMutation cm; + return String.format("BatchStatement(type=%s, statements=%s, consistency=%s)", type, statements, getConsistencyLevel()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index 50a279f..fc0efb8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.TimeoutException; import org.apache.cassandra.cql3.*; import org.apache.cassandra.config.CFMetaData; @@ -29,7 +28,6 @@ import org.apache.cassandra.cql3.operations.Operation; import org.apache.cassandra.cql3.operations.SetOperation; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.CompositeType; @@ -62,7 +60,7 @@ public class DeleteStatement extends ModificationStatement this.toRemove = new ArrayList<Pair<CFDefinition.Name, Term>>(columns.size()); } - public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) + public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) throws RequestExecutionException, RequestValidationException { // keys @@ -94,7 +92,7 @@ public class DeleteStatement extends ModificationStatement Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null; - List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size()); + Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size()); UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1); for (ByteBuffer key : keys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 6072c24..192d837 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -33,8 +33,6 @@ import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.thrift.RequestType; -import org.apache.cassandra.thrift.ThriftValidation; /** * Abstract class for statements that apply on a given column family. @@ -179,7 +177,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - protected abstract List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) + protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) throws RequestExecutionException, RequestValidationException; public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 877d0d8..a4a310d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.TimeoutException; import com.google.common.collect.ArrayListMultimap; @@ -35,7 +34,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import static org.apache.cassandra.cql.QueryProcessor.validateKey; - import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily; /** @@ -100,7 +98,7 @@ public class UpdateStatement extends ModificationStatement /** {@inheritDoc} */ - public List<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) + public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables); @@ -129,13 +127,13 @@ public class UpdateStatement extends ModificationStatement Map<ByteBuffer, ColumnGroupMap> rows = needsReading ? readRows(keys, builder, (CompositeType)cfDef.cfm.comparator, local) : null; - List<IMutation> rowMutations = new LinkedList<IMutation>(); + Collection<IMutation> mutations = new LinkedList<IMutation>(); UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive()); for (ByteBuffer key: keys) - rowMutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key))); + mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key))); - return rowMutations; + return mutations; } // Returns the first empty component or null if none are http://git-wip-us.apache.org/repos/asf/cassandra/blob/5dc8ba83/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 122d3fd..df6a36a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -167,7 +167,7 @@ public class StorageProxy implements StorageProxyMBean * @param mutations the mutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) + public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level); @@ -243,7 +243,7 @@ public class StorageProxy implements StorageProxyMBean * @param mutations the RowMutations to be applied across the replicas * @param consistency_level the consistency level for the operation */ - public static void mutateAtomically(List<RowMutation> mutations, ConsistencyLevel consistency_level) + public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, WriteTimeoutException { long startTime = System.nanoTime(); @@ -294,13 +294,15 @@ public class StorageProxy implements StorageProxyMBean } } - private static void syncWriteToBatchlog(List<RowMutation> mutations, - Collection<InetAddress> endpoints, - UUID uuid) + private static void syncWriteToBatchlog(Collection<RowMutation> mutations, Collection<InetAddress> endpoints, UUID uuid) throws WriteTimeoutException { RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid); - AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, Table.SYSTEM_KS, null); + AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, + Collections.<InetAddress>emptyList(), + ConsistencyLevel.ONE, + Table.SYSTEM_KS, + null); try {
