Merge branch 'cassandra-2.1' into trunk
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9aea917b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9aea917b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9aea917b
Branch: refs/heads/trunk
Commit: 9aea917b2cefdc8a35ff58efdba85268a1ecf7d2
Parents: 4e334ab 5db108c
Author: Sylvain Lebresne <[email protected]>
Authored: Mon Aug 18 10:23:20 2014 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Mon Aug 18 10:23:20 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 8 +-
build.xml | 2 +-
.../cql3/statements/BatchStatement.java | 20 +-
.../cql3/statements/CQL3CasConditions.java | 203 --------------
.../cql3/statements/CQL3CasRequest.java | 268 +++++++++++++++++++
.../cql3/statements/ModificationStatement.java | 45 ++--
.../apache/cassandra/service/CASConditions.java | 39 ---
.../apache/cassandra/service/CASRequest.java | 45 ++++
.../apache/cassandra/service/StorageProxy.java | 13 +-
.../cassandra/thrift/CassandraServer.java | 16 +-
10 files changed, 359 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0512463,cecf153..1ac981b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,21 -1,5 +1,22 @@@
+3.0
+ * Support pure user-defined functions (CASSANDRA-7395)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+
+
2.1.1
+ * Support list index operations with conditions (CASSANDRA-7499)
* Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
* Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
* (cqlsh) Error when tracing query (CASSANDRA-7613)
@@@ -64,10 -52,6 +70,8 @@@ Merged from 2.0
* Bogus deserialization of static cells from sstable (CASSANDRA-7684)
* Fix NPE on compaction leftover cleanup for dropped table (CASSANDRA-7770)
Merged from 2.0:
- =======
- 2.0.10
+ * Fix race condition in StreamTransferTask that could lead to
+ infinite loops and premature sstable deletion (CASSANDRA-7704)
* (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222)
* Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756)
* Remove duplicates from StorageService.getJoiningNodes (CASSANDRA-7478)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/build.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 0000000,a85c1e5..b04c624
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@@ -1,0 -1,268 +1,268 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.cql3.statements;
+
+ import java.nio.ByteBuffer;
+ import java.util.*;
+
+ import org.apache.cassandra.cql3.*;
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.composites.Composite;
+ import org.apache.cassandra.db.filter.*;
+ import org.apache.cassandra.db.marshal.CompositeType;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
+ import org.apache.cassandra.service.CASRequest;
+ import org.apache.cassandra.utils.Pair;
+
+ /**
+ * Processed CAS conditions and update on potentially multiple rows of the
same partition.
+ */
+ public class CQL3CasRequest implements CASRequest
+ {
+ private final CFMetaData cfm;
+ private final ByteBuffer key;
+ private final long now;
+ private final boolean isBatch;
+
+ // We index RowCondition by the prefix of the row they applied to for 2
reasons:
+ // 1) this allows to keep things sorted to build the ColumnSlice array
below
+ // 2) this allows to detect when contradictory conditions are set (not
exists with some other conditions on the same row)
+ private final SortedMap<Composite, RowCondition> conditions;
+
+ private final List<RowUpdate> updates = new ArrayList<>();
+
+ public CQL3CasRequest(CFMetaData cfm, ByteBuffer key, boolean isBatch)
+ {
+ this.cfm = cfm;
+ // When checking if conditions apply, we want to use a fixed
reference time for a whole request to check
+ // for expired cells. Note that this is unrelated to the cell
timestamp.
+ this.now = System.currentTimeMillis();
+ this.key = key;
+ this.conditions = new TreeMap<>(cfm.comparator);
+ this.isBatch = isBatch;
+ }
+
+ public void addRowUpdate(Composite prefix, ModificationStatement stmt,
QueryOptions options, long timestamp)
+ {
+ updates.add(new RowUpdate(prefix, stmt, options, timestamp));
+ }
+
+ public void addNotExist(Composite prefix) throws InvalidRequestException
+ {
+ RowCondition previous = conditions.put(prefix, new
NotExistCondition(prefix, now));
+ if (previous != null && !(previous instanceof NotExistCondition))
+ {
+ // these should be prevented by the parser, but it doesn't hurt
to check
+ if (previous instanceof ExistCondition)
+ throw new InvalidRequestException("Cannot mix IF EXISTS and
IF NOT EXISTS conditions for the same row");
+ else
+ throw new InvalidRequestException("Cannot mix IF conditions
and IF NOT EXISTS for the same row");
+ }
+ }
+
+ public void addExist(Composite prefix) throws InvalidRequestException
+ {
+ RowCondition previous = conditions.put(prefix, new
ExistCondition(prefix, now));
+ // this should be prevented by the parser, but it doesn't hurt to
check
- if (previous != null && previous instanceof NotExistCondition)
++ if (previous instanceof NotExistCondition)
+ throw new InvalidRequestException("Cannot mix IF EXISTS and IF
NOT EXISTS conditions for the same row");
+ }
+
+ public void addConditions(Composite prefix, Collection<ColumnCondition>
conds, QueryOptions options) throws InvalidRequestException
+ {
+ RowCondition condition = conditions.get(prefix);
+ if (condition == null)
+ {
+ condition = new ColumnsConditions(prefix, now);
+ conditions.put(prefix, condition);
+ }
+ else if (!(condition instanceof ColumnsConditions))
+ {
+ throw new InvalidRequestException("Cannot mix IF conditions and
IF NOT EXISTS for the same row");
+ }
+ ((ColumnsConditions)condition).addConditions(conds, options);
+ }
+
+ public IDiskAtomFilter readFilter()
+ {
+ assert !conditions.isEmpty();
+ ColumnSlice[] slices = new ColumnSlice[conditions.size()];
+ int i = 0;
+ // We always read CQL rows entirely as on CAS failure we want to be
able to distinguish between "row exists
+ // but all values for which there were conditions are null" and "row
doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
+ for (Composite prefix : conditions.keySet())
+ slices[i++] = prefix.slice();
+
+ int toGroup = cfm.comparator.isDense() ? -1 :
cfm.clusteringColumns().size();
+ assert ColumnSlice.validateSlices(slices, cfm.comparator, false);
+ return new SliceQueryFilter(slices, false, slices.length, toGroup);
+ }
+
+ public boolean appliesTo(ColumnFamily current) throws
InvalidRequestException
+ {
+ for (RowCondition condition : conditions.values())
+ {
+ if (!condition.appliesTo(current))
+ return false;
+ }
+ return true;
+ }
+
+ public ColumnFamily makeUpdates(ColumnFamily current) throws
InvalidRequestException
+ {
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfm);
+ for (RowUpdate upd : updates)
+ upd.applyUpdates(current, cf);
+
+ if (isBatch)
+ BatchStatement.verifyBatchSize(Collections.singleton(cf));
+
+ return cf;
+ }
+
+ /**
+ * Due to some operation on lists, we can't generate the update that a
given Modification statement does before
+ * we get the values read by the initial read of Paxos. A RowUpdate thus
just store the relevant information
+ * (include the statement iself) to generate those updates. We'll have
multiple RowUpdate for a Batch, otherwise
+ * we'll have only one.
+ */
+ private class RowUpdate
+ {
+ private final Composite rowPrefix;
+ private final ModificationStatement stmt;
+ private final QueryOptions options;
+ private final long timestamp;
+
+ private RowUpdate(Composite rowPrefix, ModificationStatement stmt,
QueryOptions options, long timestamp)
+ {
+ this.rowPrefix = rowPrefix;
+ this.stmt = stmt;
+ this.options = options;
+ this.timestamp = timestamp;
+ }
+
+ public void applyUpdates(ColumnFamily current, ColumnFamily updates)
throws InvalidRequestException
+ {
+ Map<ByteBuffer, CQL3Row> map = null;
+ if (stmt.requiresRead())
+ {
+ // Uses the "current" values read by Paxos for lists
operation that requires a read
+ Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm,
now).group(current.iterator(new ColumnSlice[]{ rowPrefix.slice() }));
+ if (iter.hasNext())
+ {
+ map = Collections.singletonMap(key, iter.next());
+ assert !iter.hasNext() : "We shoudn't be updating more
than one CQL row per-ModificationStatement";
+ }
+ }
+
+ UpdateParameters params = new UpdateParameters(cfm, options,
timestamp, stmt.getTimeToLive(options), map);
+ stmt.addUpdateForKey(updates, key, rowPrefix, params);
+ }
+ }
+
+ private static abstract class RowCondition
+ {
+ public final Composite rowPrefix;
+ protected final long now;
+
+ protected RowCondition(Composite rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
+
+ public abstract boolean appliesTo(ColumnFamily current) throws
InvalidRequestException;
+ }
+
+ private static class NotExistCondition extends RowCondition
+ {
+ private NotExistCondition(Composite rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return true;
+
+ Iterator<Cell> iter = current.iterator(new ColumnSlice[]{
rowPrefix.slice() });
+ while (iter.hasNext())
+ if (iter.next().isLive(now))
+ return false;
+ return true;
+ }
+ }
+
+ private static class ExistCondition extends RowCondition
+ {
+ private ExistCondition(Composite rowPrefix, long now)
+ {
+ super (rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return false;
+
+ Iterator<Cell> iter = current.iterator(new ColumnSlice[]{
rowPrefix.slice() });
+ while (iter.hasNext())
+ if (iter.next().isLive(now))
+ return true;
+ return false;
+ }
+ }
+
+ private static class ColumnsConditions extends RowCondition
+ {
+ private final Map<Pair<ColumnIdentifier, ByteBuffer>,
ColumnCondition.Bound> conditions = new HashMap<>();
+
+ private ColumnsConditions(Composite rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public void addConditions(Collection<ColumnCondition> conds,
QueryOptions options) throws InvalidRequestException
+ {
+ for (ColumnCondition condition : conds)
+ {
+ // We will need the variables in appliesTo but with protocol
batches, each condition in this object can have a
+ // different list of variables.
+ ColumnCondition.Bound current = condition.bind(options);
+ ColumnCondition.Bound previous =
conditions.put(Pair.create(condition.column.name,
current.getCollectionElementValue()), current);
+ // If 2 conditions are actually equal, let it slide
+ if (previous != null && !previous.equals(current))
+ throw new InvalidRequestException("Duplicate and
incompatible conditions for column " + condition.column.name);
+ }
+ }
+
+ public boolean appliesTo(ColumnFamily current) throws
InvalidRequestException
+ {
+ if (current == null)
+ return conditions.isEmpty();
+
+ for (ColumnCondition.Bound condition : conditions.values())
+ if (!condition.appliesTo(rowPrefix, current, now))
+ return false;
+ return true;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9aea917b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------