Merge branch 'cassandra-2.1' into trunk Conflicts: CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9aea917b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9aea917b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9aea917b Branch: refs/heads/trunk Commit: 9aea917b2cefdc8a35ff58efdba85268a1ecf7d2 Parents: 4e334ab 5db108c Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Aug 18 10:23:20 2014 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon Aug 18 10:23:20 2014 +0200 ---------------------------------------------------------------------- CHANGES.txt | 8 +- build.xml | 2 +- .../cql3/statements/BatchStatement.java | 20 +- .../cql3/statements/CQL3CasConditions.java | 203 -------------- .../cql3/statements/CQL3CasRequest.java | 268 +++++++++++++++++++ .../cql3/statements/ModificationStatement.java | 45 ++-- .../apache/cassandra/service/CASConditions.java | 39 --- .../apache/cassandra/service/CASRequest.java | 45 ++++ .../apache/cassandra/service/StorageProxy.java | 13 +- .../cassandra/thrift/CassandraServer.java | 16 +- 10 files changed, 359 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 0512463,cecf153..1ac981b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,21 -1,5 +1,22 @@@ +3.0 + * Support pure user-defined functions (CASSANDRA-7395) + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416) + * Move sstable RandomAccessReader to nio2, which allows using the + FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050) + * Remove CQL2 (CASSANDRA-5918) + * Add Thrift get_multi_slice call (CASSANDRA-6757) + * Optimize fetching multiple cells by name (CASSANDRA-6933) + * Allow compilation in java 8 (CASSANDRA-7028) + * Make incremental repair default (CASSANDRA-7250) + * Enable code coverage thru JaCoCo (CASSANDRA-7226) + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) + * Shorten SSTable path (CASSANDRA-6962) + * Use unsafe mutations for most unit tests (CASSANDRA-6969) + * Fix race condition during calculation of pending ranges (CASSANDRA-7390) + + 2.1.1 + * Support list index operations with conditions (CASSANDRA-7499) * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731) * Validate IPv6 wildcard addresses properly (CASSANDRA-7680) * (cqlsh) Error when tracing query (CASSANDRA-7613) @@@ -64,10 -52,6 +70,8 @@@ Merged from 2.0 * Bogus deserialization of static cells from sstable (CASSANDRA-7684) * Fix NPE on compaction leftover cleanup for dropped table (CASSANDRA-7770) Merged from 2.0: - ======= - 2.0.10 + * Fix race condition in StreamTransferTask that could lead to + infinite loops and premature sstable deletion (CASSANDRA-7704) * (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222) * Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756) * Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/build.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index 0000000,a85c1e5..b04c624 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@@ -1,0 -1,268 +1,268 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.cql3.statements; + + import java.nio.ByteBuffer; + import java.util.*; + + import org.apache.cassandra.cql3.*; + import org.apache.cassandra.config.CFMetaData; + import org.apache.cassandra.db.*; + import org.apache.cassandra.db.composites.Composite; + import org.apache.cassandra.db.filter.*; + import org.apache.cassandra.db.marshal.CompositeType; + import org.apache.cassandra.exceptions.InvalidRequestException; + import org.apache.cassandra.service.CASRequest; + import org.apache.cassandra.utils.Pair; + + /** + * Processed CAS conditions and update on potentially multiple rows of the same partition. + */ + public class CQL3CasRequest implements CASRequest + { + private final CFMetaData cfm; + private final ByteBuffer key; + private final long now; + private final boolean isBatch; + + // We index RowCondition by the prefix of the row they applied to for 2 reasons: + // 1) this allows to keep things sorted to build the ColumnSlice array below + // 2) this allows to detect when contradictory conditions are set (not exists with some other conditions on the same row) + private final SortedMap<Composite, RowCondition> conditions; + + private final List<RowUpdate> updates = new ArrayList<>(); + + public CQL3CasRequest(CFMetaData cfm, ByteBuffer key, boolean isBatch) + { + this.cfm = cfm; + // When checking if conditions apply, we want to use a fixed reference time for a whole request to check + // for expired cells. Note that this is unrelated to the cell timestamp. + this.now = System.currentTimeMillis(); + this.key = key; + this.conditions = new TreeMap<>(cfm.comparator); + this.isBatch = isBatch; + } + + public void addRowUpdate(Composite prefix, ModificationStatement stmt, QueryOptions options, long timestamp) + { + updates.add(new RowUpdate(prefix, stmt, options, timestamp)); + } + + public void addNotExist(Composite prefix) throws InvalidRequestException + { + RowCondition previous = conditions.put(prefix, new NotExistCondition(prefix, now)); + if (previous != null && !(previous instanceof NotExistCondition)) + { + // these should be prevented by the parser, but it doesn't hurt to check + if (previous instanceof ExistCondition) + throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); + else + throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); + } + } + + public void addExist(Composite prefix) throws InvalidRequestException + { + RowCondition previous = conditions.put(prefix, new ExistCondition(prefix, now)); + // this should be prevented by the parser, but it doesn't hurt to check - if (previous != null && previous instanceof NotExistCondition) ++ if (previous instanceof NotExistCondition) + throw new InvalidRequestException("Cannot mix IF EXISTS and IF NOT EXISTS conditions for the same row"); + } + + public void addConditions(Composite prefix, Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException + { + RowCondition condition = conditions.get(prefix); + if (condition == null) + { + condition = new ColumnsConditions(prefix, now); + conditions.put(prefix, condition); + } + else if (!(condition instanceof ColumnsConditions)) + { + throw new InvalidRequestException("Cannot mix IF conditions and IF NOT EXISTS for the same row"); + } + ((ColumnsConditions)condition).addConditions(conds, options); + } + + public IDiskAtomFilter readFilter() + { + assert !conditions.isEmpty(); + ColumnSlice[] slices = new ColumnSlice[conditions.size()]; + int i = 0; + // We always read CQL rows entirely as on CAS failure we want to be able to distinguish between "row exists + // but all values for which there were conditions are null" and "row doesn't exists", and we can't rely on the + // row marker for that (see #6623) + for (Composite prefix : conditions.keySet()) + slices[i++] = prefix.slice(); + + int toGroup = cfm.comparator.isDense() ? -1 : cfm.clusteringColumns().size(); + assert ColumnSlice.validateSlices(slices, cfm.comparator, false); + return new SliceQueryFilter(slices, false, slices.length, toGroup); + } + + public boolean appliesTo(ColumnFamily current) throws InvalidRequestException + { + for (RowCondition condition : conditions.values()) + { + if (!condition.appliesTo(current)) + return false; + } + return true; + } + + public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm); + for (RowUpdate upd : updates) + upd.applyUpdates(current, cf); + + if (isBatch) + BatchStatement.verifyBatchSize(Collections.singleton(cf)); + + return cf; + } + + /** + * Due to some operation on lists, we can't generate the update that a given Modification statement does before + * we get the values read by the initial read of Paxos. A RowUpdate thus just store the relevant information + * (include the statement iself) to generate those updates. We'll have multiple RowUpdate for a Batch, otherwise + * we'll have only one. + */ + private class RowUpdate + { + private final Composite rowPrefix; + private final ModificationStatement stmt; + private final QueryOptions options; + private final long timestamp; + + private RowUpdate(Composite rowPrefix, ModificationStatement stmt, QueryOptions options, long timestamp) + { + this.rowPrefix = rowPrefix; + this.stmt = stmt; + this.options = options; + this.timestamp = timestamp; + } + + public void applyUpdates(ColumnFamily current, ColumnFamily updates) throws InvalidRequestException + { + Map<ByteBuffer, CQL3Row> map = null; + if (stmt.requiresRead()) + { + // Uses the "current" values read by Paxos for lists operation that requires a read + Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(current.iterator(new ColumnSlice[]{ rowPrefix.slice() })); + if (iter.hasNext()) + { + map = Collections.singletonMap(key, iter.next()); + assert !iter.hasNext() : "We shoudn't be updating more than one CQL row per-ModificationStatement"; + } + } + + UpdateParameters params = new UpdateParameters(cfm, options, timestamp, stmt.getTimeToLive(options), map); + stmt.addUpdateForKey(updates, key, rowPrefix, params); + } + } + + private static abstract class RowCondition + { + public final Composite rowPrefix; + protected final long now; + + protected RowCondition(Composite rowPrefix, long now) + { + this.rowPrefix = rowPrefix; + this.now = now; + } + + public abstract boolean appliesTo(ColumnFamily current) throws InvalidRequestException; + } + + private static class NotExistCondition extends RowCondition + { + private NotExistCondition(Composite rowPrefix, long now) + { + super(rowPrefix, now); + } + + public boolean appliesTo(ColumnFamily current) + { + if (current == null) + return true; + + Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); + while (iter.hasNext()) + if (iter.next().isLive(now)) + return false; + return true; + } + } + + private static class ExistCondition extends RowCondition + { + private ExistCondition(Composite rowPrefix, long now) + { + super (rowPrefix, now); + } + + public boolean appliesTo(ColumnFamily current) + { + if (current == null) + return false; + + Iterator<Cell> iter = current.iterator(new ColumnSlice[]{ rowPrefix.slice() }); + while (iter.hasNext()) + if (iter.next().isLive(now)) + return true; + return false; + } + } + + private static class ColumnsConditions extends RowCondition + { + private final Map<Pair<ColumnIdentifier, ByteBuffer>, ColumnCondition.Bound> conditions = new HashMap<>(); + + private ColumnsConditions(Composite rowPrefix, long now) + { + super(rowPrefix, now); + } + + public void addConditions(Collection<ColumnCondition> conds, QueryOptions options) throws InvalidRequestException + { + for (ColumnCondition condition : conds) + { + // We will need the variables in appliesTo but with protocol batches, each condition in this object can have a + // different list of variables. + ColumnCondition.Bound current = condition.bind(options); + ColumnCondition.Bound previous = conditions.put(Pair.create(condition.column.name, current.getCollectionElementValue()), current); + // If 2 conditions are actually equal, let it slide + if (previous != null && !previous.equals(current)) + throw new InvalidRequestException("Duplicate and incompatible conditions for column " + condition.column.name); + } + } + + public boolean appliesTo(ColumnFamily current) throws InvalidRequestException + { + if (current == null) + return conditions.isEmpty(); + + for (ColumnCondition.Bound condition : conditions.values()) + if (!condition.appliesTo(rowPrefix, current, now)) + return false; + return true; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/thrift/CassandraServer.java ----------------------------------------------------------------------