Updated Branches: refs/heads/trunk 8f46176e9 -> c54fe4c41
(CQL3) Fix prepend logic and ensure batches have a unique timestamp patch by slebresne; reviewed by jbellis for CASSANDRA-4835 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c54fe4c4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c54fe4c4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c54fe4c4 Branch: refs/heads/trunk Commit: c54fe4c41e18ffcedb3a849aead4d7576cd73167 Parents: 8f46176 Author: Sylvain Lebresne <[email protected]> Authored: Thu Oct 25 08:51:48 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Oct 25 08:51:48 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cql3/operations/ListOperation.java | 18 +++++++++----- .../cassandra/cql3/statements/BatchStatement.java | 6 ++-- .../cassandra/cql3/statements/DeleteStatement.java | 4 +- .../cql3/statements/ModificationStatement.java | 12 +++++---- .../cassandra/cql3/statements/UpdateStatement.java | 6 ++-- 6 files changed, 27 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3ef9530..e067d9c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,7 @@ * Move consistency level to the protocol level (CASSANDRA-4734, 4824) * Fix Subcolumn slice ends not respected (CASSANDRA-4826) * Fix Assertion error in cql3 select (CASSANDRA-4783) + * Fix list prepend logic (CQL3) (CASSANDRA-4835) Merged from 1.1: * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816) * fix indexing empty column values (CASSANDRA-4832) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/operations/ListOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java index b40cc18..64dcdb2 100644 --- a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java +++ b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java @@ -46,7 +46,7 @@ public class ListOperation implements Operation * For prepend, we need to be able to generate unique but decreasing time * UUID, which is a bit challenging. To do that, given a time in milliseconds, * we adds a number representing the 100-nanoseconds precision and make sure - * that within the same millisecond, that number is always increasing. We + * that within the same millisecond, that number is always decreasing. We * do rely on the fact that the user will only provide decreasing * milliseconds timestamp for that purpose. */ @@ -72,8 +72,8 @@ public class ListOperation implements Operation assert millis <= current.millis; PrecisionTime next = millis < current.millis - ? new PrecisionTime(millis, 0) - : new PrecisionTime(millis, current.nanos + 1); + ? new PrecisionTime(millis, 9999) + : new PrecisionTime(millis, Math.max(0, current.nanos - 1)); if (last.compareAndSet(current, next)) return next; @@ -180,11 +180,9 @@ public class ListOperation implements Operation { long time = REFERENCE_TIME - (System.currentTimeMillis() - REFERENCE_TIME); - // We do the loop in reverse order because getNext() will create increasing time but we want the last - // value in the prepended list to have the lower time - for (int i = values.size() - 1; i >= 0; i--) + for (int i = 0; i < values.size(); i++) { - ColumnNameBuilder b = i == 0 ? builder : builder.copy(); + ColumnNameBuilder b = i == values.size() - 1 ? builder : builder.copy(); PrecisionTime pt = getNextTime(time); ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes(pt.millis, pt.nanos)); ByteBuffer name = b.add(uuid).build(); @@ -263,6 +261,12 @@ public class ListOperation implements Operation return new ListOperation(values, Kind.DISCARD_IDX); } + @Override + public String toString() + { + return "ListOperation(" + kind + ", " + values + ")"; + } + private int validateListIdx(Term value, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index ff1d730..6ab0271 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -97,17 +97,17 @@ public class BatchStatement extends ModificationStatement statement.validateConsistency(cl); } - public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl) + public Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>(); for (ModificationStatement statement : statements) { if (isSetTimestamp()) - statement.setTimestamp(getTimestamp(clientState)); + statement.setTimestamp(getTimestamp(now)); // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(clientState, variables, local, cl)) + for (IMutation m : statement.getMutations(clientState, variables, local, cl, now)) { if (m instanceof CounterMutation && type != Type.COUNTER) throw new InvalidRequestException("Counter mutations are only allowed in COUNTER batches"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index 1015a90..76c4374 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -69,7 +69,7 @@ public class DeleteStatement extends ModificationStatement cl.validateForWrite(cfDef.cfm.ksName); } - public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl) + public Collection<RowMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { // keys @@ -104,7 +104,7 @@ public class DeleteStatement extends ModificationStatement Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null; Collection<RowMutation> rowMutations = new ArrayList<RowMutation>(keys.size()); - UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), -1); + UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), -1); for (ByteBuffer key : keys) rowMutations.add(mutationForKey(cfDef, key, builder, isRange, params, rows == null ? null : rows.get(key))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 426274b..afdff22 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -89,7 +89,7 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt validateConsistency(cl); - Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl); + Collection<? extends IMutation> mutations = getMutations(state, variables, false, cl, state.getTimestamp()); // The type should have been set by now or we have a bug assert type != null; @@ -115,14 +115,14 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt public ResultMessage executeInternal(ClientState state) throws RequestValidationException, RequestExecutionException { - for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null)) + for (IMutation mutation : getMutations(state, Collections.<ByteBuffer>emptyList(), true, null, state.getTimestamp())) mutation.apply(); return null; } - public long getTimestamp(ClientState clientState) + public long getTimestamp(long now) { - return timestamp == null ? clientState.getTimestamp() : timestamp; + return timestamp == null ? now : timestamp; } public void setTimestamp(long timestamp) @@ -202,11 +202,13 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt * @param clientState current client status * @param variables value for prepared statement markers * @param local if true, any requests (for collections) performed by getMutation should be done locally only. + * @param cl the consistency to use for the potential reads involved in generating the mutations (for lists set/delete operations) + * @param now the current timestamp in microseconds to use if no timestamp is user provided. * * @return list of the mutations * @throws InvalidRequestException on invalid requests */ - protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl) + protected abstract Collection<? extends IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException; public abstract ParsedStatement.Prepared prepare(CFDefinition.Name[] boundNames) throws InvalidRequestException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c54fe4c4/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index 2a99b3f..8c19b10 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -105,7 +105,7 @@ public class UpdateStatement extends ModificationStatement } /** {@inheritDoc} */ - public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl) + public Collection<IMutation> getMutations(ClientState clientState, List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> keys = buildKeyNames(cfDef, processedKeys, variables); @@ -135,7 +135,7 @@ public class UpdateStatement extends ModificationStatement Map<ByteBuffer, ColumnGroupMap> rows = toRead != null ? readRows(keys, builder, toRead, (CompositeType)cfDef.cfm.comparator, local, cl) : null; Collection<IMutation> mutations = new LinkedList<IMutation>(); - UpdateParameters params = new UpdateParameters(variables, getTimestamp(clientState), getTimeToLive()); + UpdateParameters params = new UpdateParameters(variables, getTimestamp(now), getTimeToLive()); for (ByteBuffer key: keys) mutations.add(mutationForKey(cfDef, key, builder, params, rows == null ? null : rows.get(key), cl)); @@ -449,7 +449,7 @@ public class UpdateStatement extends ModificationStatement cfName, whereClause, columns, - isSetTimestamp() ? getTimestamp(null) : "<now>", + isSetTimestamp() ? getTimestamp(-1) : "<now>", getTimeToLive()); } }
