Updated Branches: refs/heads/trunk c97d9ca38 -> 934339acd
hack to allow us to special-case RowMutation construction depending on if it's part of a batch Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/934339ac Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/934339ac Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/934339ac Branch: refs/heads/trunk Commit: 934339acd660bd986224036104ab9ec5db46f756 Parents: c97d9ca Author: Jonathan Ellis <[email protected]> Authored: Tue Mar 5 04:53:06 2013 +0100 Committer: Jonathan Ellis <[email protected]> Committed: Tue Mar 5 04:53:17 2013 +0100 ---------------------------------------------------------------------- .../cassandra/cql3/statements/BatchStatement.java | 14 ++++++++++- .../cassandra/cql3/statements/DeleteStatement.java | 19 +++++++++++--- .../cql3/statements/ModificationStatement.java | 10 +++++++- .../cassandra/cql3/statements/UpdateStatement.java | 19 +++++++++++---- 4 files changed, 51 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/934339ac/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 6200237..6987bf6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -89,6 +89,7 @@ public class BatchStatement extends ModificationStatement statement.validateConsistency(cl); } + @Override public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { @@ -99,7 +100,7 @@ public class BatchStatement extends ModificationStatement statement.setTimestamp(getTimestamp(now)); // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(variables, local, cl, now)) + for (IMutation m : statement.getMutationsInternal(variables, local, cl, now, true)) { if (m instanceof CounterMutation && type != Type.COUNTER) throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches"); @@ -111,15 +112,26 @@ public class BatchStatement extends ModificationStatement IMutation existing = mutations.get(key); if (existing == null) + { mutations.put(key, m); + } else + { + existing.addAll(m); + } } } return mutations.values(); } + protected Collection<? extends IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) throws RequestExecutionException, RequestValidationException + { + // batch statements should not contain other batches + throw new UnsupportedOperationException(); + } + public ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException { // XXX: we use our knowledge that Modification don't create new statement upon call to prepare() http://git-wip-us.apache.org/repos/asf/cassandra/blob/934339ac/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index 67e8b64..4fe52da 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -61,7 +61,7 @@ public class DeleteStatement extends ModificationStatement cl.validateForWrite(cfDef.cfm.ksName); } - public Collection<RowMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) + public Collection<RowMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) throws RequestExecutionException, RequestValidationException { // keys @@ -94,12 +94,12 @@ public class DeleteStatement extends ModificationStatement UpdateParameters params = new UpdateParameters(cfDef.cfm, variables, getTimestamp(now), -1, rows); for (ByteBuffer key : keys) - rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params)); + rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, isBatch)); return rowMutations; } - public RowMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, boolean isRange, UpdateParameters params) + public RowMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, boolean isRange, UpdateParameters params, boolean isBatch) throws InvalidRequestException { QueryProcessor.validateKey(key); @@ -135,7 +135,18 @@ public class DeleteStatement extends ModificationStatement } } - return new RowMutation(cfDef.cfm.ksName, key, cf); + RowMutation rm; + if (isBatch) + { + // we might group other mutations together with this one later, so make it mutable + rm = new RowMutation(cfDef.cfm.ksName, key); + rm.add(cf); + } + else + { + rm = new RowMutation(cfDef.cfm.ksName, key, cf); + } + return rm; } public ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/934339ac/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index ad4f11a..6118937 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -207,7 +207,15 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) + protected Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) + throws RequestExecutionException, RequestValidationException + { + return getMutationsInternal(variables, local, cl, now, false); + } + + // hack to allow us to special-case RowMutation construction depending on if it's part of a batch + // (in which case we need the CF collection to be mutable), or not (in which case we can use more-efficient SingletonMap) + protected abstract Collection<? extends IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) throws RequestExecutionException, RequestValidationException; public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/934339ac/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 9755f72..7a0b0bf 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -104,8 +104,7 @@ public class UpdateStatement extends ModificationStatement cl.validateForWrite(cfDef.cfm.ksName); } - /** {@inheritDoc} */ - public Collection<IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) + public Collection<IMutation> getMutationsInternal(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables); @@ -131,7 +130,7 @@ public class UpdateStatement extends ModificationStatement UpdateParameters params = new UpdateParameters(cfDef.cfm, variables, getTimestamp(now), getTimeToLive(), rows); for (ByteBuffer key: keys) - mutations.add(mutationForKey(cfDef, key, builder, params, cl)); + mutations.add(mutationForKey(cfDef, key, builder, params, cl, isBatch)); return mutations; } @@ -207,7 +206,7 @@ public class UpdateStatement extends ModificationStatement * * @throws InvalidRequestException on the wrong request */ - private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ConsistencyLevel cl) + private IMutation mutationForKey(CFDefinition cfDef, ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params, ConsistencyLevel cl, boolean isBatch) throws InvalidRequestException { validateKey(key); @@ -261,7 +260,17 @@ public class UpdateStatement extends ModificationStatement op.execute(key, cf, builder.copy(), params); } - RowMutation rm = new RowMutation(cfDef.cfm.ksName, key, cf); + RowMutation rm; + if (isBatch) + { + // we might group other mutations together with this one later, so make it mutable + rm = new RowMutation(cfDef.cfm.ksName, key); + rm.add(cf); + } + else + { + rm = new RowMutation(cfDef.cfm.ksName, key, cf); + } return type == Type.COUNTER ? new CounterMutation(rm, cl) : rm; }
