Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/31aa2a23 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/31aa2a23 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/31aa2a23 Branch: refs/heads/cassandra-2.1 Commit: 31aa2a23c3cf7a45c6563d75b0a958d9e492c681 Parents: c65d81b 3504a50 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Wed Jul 1 22:02:19 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Wed Jul 1 22:02:19 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 7 ++++--- .../org/apache/cassandra/db/ColumnFamilyStore.java | 2 +- src/java/org/apache/cassandra/db/DefsTables.java | 2 +- src/java/org/apache/cassandra/db/Keyspace.java | 15 +++++++++++++-- src/java/org/apache/cassandra/db/Mutation.java | 2 +- test/unit/org/apache/cassandra/db/CommitLogTest.java | 2 +- 6 files changed, 21 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 052ced1,391874b..5c55b9e --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,6 +1,14 @@@ -2.0.17 +2.1.8 + * Update internal python driver for cqlsh (CASSANDRA-9064) - * Avoids ballot clash in Paxos (CASSANDRA-9649) + * Fix IndexOutOfBoundsException when inserting tuple with too many + elements using the string literal notation (CASSANDRA-9559) + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090) + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540) + * Enable describe on indices (CASSANDRA-7814) + * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637) - Merged from 2.0 ++Merged from 2.0: + * Fix setting 'durable_writes' in ALTER KEYSPACE (CASSANDRA-9560) + * Avoid ballot clash in Paxos (CASSANDRA-9649) * Improve trace messages for RR (CASSANDRA-9479) * Fix suboptimal secondary index selection when restricted clustering column is also indexed (CASSANDRA-9631) @@@ -15,12 -8,10 +16,12 @@@ * Fix error message when attempting to create an index on a column in a COMPACT STORAGE table with clustering columns (CASSANDRA-9527) * 'WITH WITH' in alter keyspace statements causes NPE (CASSANDRA-9565) - * Display min timestamp in sstablemetadata viewer (CASSANDRA-6767) -2.0.16: +2.1.7 + * Fix bug in cardinality check when compacting (CASSANDRA-9580) + * Fix memory leak in Ref due to ConcurrentLinkedQueue.remove() behaviour (CASSANDRA-9549) - Merged from 2.0 ++Merged from 2.0: * Expose some internals of SelectStatement for inspection (CASSANDRA-9532) * ArrivalWindow should use primitives (CASSANDRA-9496) * Periodically submit background compaction tasks (CASSANDRA-9592) http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/src/java/org/apache/cassandra/db/DefsTables.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Keyspace.java index cec1beb,915ccca..4f59c40 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@@ -71,11 -70,9 +71,11 @@@ public class Keyspac DatabaseDescriptor.createAllDirectories(); } - public final KSMetaData metadata; + public final OpOrder writeOrder = new OpOrder(); + /* ColumnFamilyStore per column family */ private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>(); + private volatile KSMetaData metadata; private volatile AbstractReplicationStrategy replicationStrategy; public static final Function<String,Keyspace> keyspaceTransformer = new Function<String, Keyspace>() http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Mutation.java index a6d23cb,0000000..0424f5a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@@ -1,351 -1,0 +1,351 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO convert this to a Builder pattern instead of encouraging M.add directly, +// which is less-efficient since we have to keep a mutable HashMap around +public class Mutation implements IMutation +{ + public static final MutationSerializer serializer = new MutationSerializer(); + private static final Logger logger = LoggerFactory.getLogger(Mutation.class); + + public static final String FORWARD_TO = "FWD_TO"; + public static final String FORWARD_FROM = "FWD_FRM"; + + // todo this is redundant + // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test + private final String keyspaceName; + + private final ByteBuffer key; + // map of column family id to mutations for that column family. + private final Map<UUID, ColumnFamily> modifications; + + public Mutation(String keyspaceName, ByteBuffer key) + { + this(keyspaceName, key, new HashMap<UUID, ColumnFamily>()); + } + + public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf) + { + this(keyspaceName, key, Collections.singletonMap(cf.id(), cf)); + } + + public Mutation(String keyspaceName, Row row) + { + this(keyspaceName, row.key.getKey(), row.cf); + } + + protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications) + { + this.keyspaceName = keyspaceName; + this.key = key; + this.modifications = modifications; + } + + public Mutation(ByteBuffer key, ColumnFamily cf) + { + this(cf.metadata().ksName, key, cf); + } + + public Mutation copy() + { + Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications)); + return copy; + } + + public String getKeyspaceName() + { + return keyspaceName; + } + + public Collection<UUID> getColumnFamilyIds() + { + return modifications.keySet(); + } + + public ByteBuffer key() + { + return key; + } + + public Collection<ColumnFamily> getColumnFamilies() + { + return modifications.values(); + } + + public ColumnFamily getColumnFamily(UUID cfId) + { + return modifications.get(cfId); + } + + /* + * Specify a column family name and the corresponding column + * family object. + * param @ cf - column family name + * param @ columnFamily - the column family. + */ + public void add(ColumnFamily columnFamily) + { + assert columnFamily != null; + ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily); + if (prev != null) + // developer error + throw new IllegalArgumentException("ColumnFamily " + columnFamily + " already has modifications in this mutation: " + prev); + } + + /** + * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary. + */ + public ColumnFamily addOrGet(String cfName) + { + return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName)); + } + + public ColumnFamily addOrGet(CFMetaData cfm) + { + ColumnFamily cf = modifications.get(cfm.cfId); + if (cf == null) + { + cf = ArrayBackedSortedColumns.factory.create(cfm); + modifications.put(cfm.cfId, cf); + } + return cf; + } + + public boolean isEmpty() + { + return modifications.isEmpty(); + } + + public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive) + { + addOrGet(cfName).addColumn(name, value, timestamp, timeToLive); + } + + public void addCounter(String cfName, CellName name, long value) + { + addOrGet(cfName).addCounter(name, value); + } + + public void add(String cfName, CellName name, ByteBuffer value, long timestamp) + { + add(cfName, name, value, timestamp, 0); + } + + public void delete(String cfName, long timestamp) + { + int localDeleteTime = (int) (System.currentTimeMillis() / 1000); + addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime)); + } + + public void delete(String cfName, CellName name, long timestamp) + { + int localDeleteTime = (int) (System.currentTimeMillis() / 1000); + addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp); + } + + public void deleteRange(String cfName, Composite start, Composite end, long timestamp) + { + int localDeleteTime = (int) (System.currentTimeMillis() / 1000); + addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime)); + } + + public void addAll(IMutation m) + { + if (!(m instanceof Mutation)) + throw new IllegalArgumentException(); + + Mutation mutation = (Mutation)m; + if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key)) + throw new IllegalArgumentException(); + + for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet()) + { + // It's slighty faster to assume the key wasn't present and fix if + // not in the case where it wasn't there indeed. + ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue()); + if (cf != null) + entry.getValue().addAll(cf); + } + } + + /* + * This is equivalent to calling commit. Applies the changes to + * to the keyspace that is obtained by calling Keyspace.open(). + */ + public void apply() + { + Keyspace ks = Keyspace.open(keyspaceName); - ks.apply(this, ks.metadata.durableWrites); ++ ks.apply(this, ks.getMetadata().durableWrites); + } + + public void applyUnsafe() + { + Keyspace.open(keyspaceName).apply(this, false); + } + + public MessageOut<Mutation> createMessage() + { + return createMessage(MessagingService.Verb.MUTATION); + } + + public MessageOut<Mutation> createMessage(MessagingService.Verb verb) + { + return new MessageOut<>(verb, this, serializer); + } + + public long getTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout(); + } + + public String toString() + { + return toString(false); + } + + public String toString(boolean shallow) + { + StringBuilder buff = new StringBuilder("Mutation("); + buff.append("keyspace='").append(keyspaceName).append('\''); + buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\''); + buff.append(", modifications=["); + if (shallow) + { + List<String> cfnames = new ArrayList<String>(modifications.size()); + for (UUID cfid : modifications.keySet()) + { + CFMetaData cfm = Schema.instance.getCFMetaData(cfid); + cfnames.add(cfm == null ? "-dropped-" : cfm.cfName); + } + buff.append(StringUtils.join(cfnames, ", ")); + } + else + buff.append(StringUtils.join(modifications.values(), ", ")); + return buff.append("])").toString(); + } + + public Mutation without(UUID cfId) + { + Mutation mutation = new Mutation(keyspaceName, key); + for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet()) + if (!entry.getKey().equals(cfId)) + mutation.add(entry.getValue()); + return mutation; + } + + public static class MutationSerializer implements IVersionedSerializer<Mutation> + { + public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_20) + out.writeUTF(mutation.getKeyspaceName()); + + ByteBufferUtil.writeWithShortLength(mutation.key(), out); + + /* serialize the modifications in the mutation */ + int size = mutation.modifications.size(); + out.writeInt(size); + assert size > 0; + for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet()) + ColumnFamily.serializer.serialize(entry.getValue(), out, version); + } + + public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException + { + String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that + if (version < MessagingService.VERSION_20) + keyspaceName = in.readUTF(); + + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + int size = in.readInt(); + assert size > 0; + + Map<UUID, ColumnFamily> modifications; + if (size == 1) + { + ColumnFamily cf = deserializeOneCf(in, version, flag); + modifications = Collections.singletonMap(cf.id(), cf); + keyspaceName = cf.metadata().ksName; + } + else + { + modifications = new HashMap<UUID, ColumnFamily>(); + for (int i = 0; i < size; ++i) + { + ColumnFamily cf = deserializeOneCf(in, version, flag); + modifications.put(cf.id(), cf); + keyspaceName = cf.metadata().ksName; + } + } + + return new Mutation(keyspaceName, key, modifications); + } + + private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException + { + ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version); + // We don't allow Mutation with null column family, so we should never get null back. + assert cf != null; + return cf; + } + + public Mutation deserialize(DataInput in, int version) throws IOException + { + return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE); + } + + public long serializedSize(Mutation mutation, int version) + { + TypeSizes sizes = TypeSizes.NATIVE; + int size = 0; + + if (version < MessagingService.VERSION_20) + size += sizes.sizeof(mutation.getKeyspaceName()); + + int keySize = mutation.key().remaining(); + size += sizes.sizeof((short) keySize) + keySize; + + size += sizes.sizeof(mutation.modifications.size()); + for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet()) + size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version); + + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/31aa2a23/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java index ab594f8,289fbc9..9a8a1dc --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java @@@ -351,10 -294,9 +351,10 @@@ public class CommitLogTest extends Sche boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot(); DatabaseDescriptor.setAutoSnapshot(false); Keyspace notDurableKs = Keyspace.open("NoCommitlogSpace"); - Assert.assertFalse(notDurableKs.metadata.durableWrites); + Assert.assertFalse(notDurableKs.getMetadata().durableWrites); ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); - RowMutation rm; + CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator(); + Mutation rm; DecoratedKey dk = Util.dk("key1"); // add data