http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/MultiCBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java new file mode 100644 index 0000000..36a03ba --- /dev/null +++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.*; + +import static java.util.Collections.singletonList; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Builder that allow to build multiple Clustering/Slice.Bound at the same time. + */ +public abstract class MultiCBuilder +{ + /** + * Creates a new empty {@code MultiCBuilder}. + */ + public static MultiCBuilder create(ClusteringComparator comparator) + { + return new ConcreteMultiCBuilder(comparator); + } + + /** + * Wraps an existing {@code CBuilder} to provide him with a MultiCBuilder interface + * for the sake of passing it to {@link Restriction.appendTo}. The resulting + * {@code MultiCBuilder} will still only be able to create a single clustering/bound + * and an {@code IllegalArgumentException} will be thrown if elements that added that + * would correspond to building multiple clusterings. + */ + public static MultiCBuilder wrap(final CBuilder builder) + { + return new MultiCBuilder() + { + private boolean containsNull; + private boolean containsUnset; + private boolean hasMissingElements; + + public MultiCBuilder addElementToAll(ByteBuffer value) + { + builder.add(value); + + if (value == null) + containsNull = true; + if (value == ByteBufferUtil.UNSET_BYTE_BUFFER) + containsUnset = true; + + return this; + } + + public MultiCBuilder addEachElementToAll(List<ByteBuffer> values) + { + if (values.isEmpty()) + { + hasMissingElements = true; + return this; + } + + if (values.size() > 1) + throw new IllegalArgumentException(); + + return addElementToAll(values.get(0)); + } + + public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values) + { + if (values.isEmpty()) + { + hasMissingElements = true; + return this; + } + + if (values.size() > 1) + throw new IllegalArgumentException(); + + return addEachElementToAll(values.get(0)); + } + + public int remainingCount() + { + return builder.remainingCount(); + } + + public boolean containsNull() + { + return containsNull; + } + + public boolean containsUnset() + { + return containsUnset; + } + + public boolean hasMissingElements() + { + return hasMissingElements; + } + + public NavigableSet<Clustering> build() + { + return FBUtilities.singleton(builder.build(), builder.comparator()); + } + + public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive) + { + return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), builder.comparator()); + } + }; + } + + /** + * Adds the specified element to all the clusterings. + * <p> + * If this builder contains 2 clustering: A-B and A-C a call to this method to add D will result in the clusterings: + * A-B-D and A-C-D. + * </p> + * + * @param value the value of the next element + * @return this <code>MulitCBuilder</code> + */ + public abstract MultiCBuilder addElementToAll(ByteBuffer value); + + /** + * Adds individually each of the specified elements to the end of all of the existing clusterings. + * <p> + * If this builder contains 2 clusterings: A-B and A-C a call to this method to add D and E will result in the 4 + * clusterings: A-B-D, A-B-E, A-C-D and A-C-E. + * </p> + * + * @param values the elements to add + * @return this <code>CompositeBuilder</code> + */ + public abstract MultiCBuilder addEachElementToAll(List<ByteBuffer> values); + + /** + * Adds individually each of the specified list of elements to the end of all of the existing composites. + * <p> + * If this builder contains 2 composites: A-B and A-C a call to this method to add [[D, E], [F, G]] will result in the 4 + * composites: A-B-D-E, A-B-F-G, A-C-D-E and A-C-F-G. + * </p> + * + * @param values the elements to add + * @return this <code>CompositeBuilder</code> + */ + public abstract MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values); + + /** + * Returns the number of elements that can be added to the clusterings. + * + * @return the number of elements that can be added to the clusterings. + */ + public abstract int remainingCount(); + + /** + * Checks if the clusterings contains null elements. + * + * @return <code>true</code> if the clusterings contains <code>null</code> elements, <code>false</code> otherwise. + */ + public abstract boolean containsNull(); + + /** + * Checks if the clusterings contains unset elements. + * + * @return <code>true</code> if the clusterings contains <code>unset</code> elements, <code>false</code> otherwise. + */ + public abstract boolean containsUnset(); + + /** + * Checks if some empty list of values have been added + * @return <code>true</code> if the clusterings have some missing elements, <code>false</code> otherwise. + */ + public abstract boolean hasMissingElements(); + + /** + * Builds the <code>clusterings</code>. + * + * @return the clusterings + */ + public abstract NavigableSet<Clustering> build(); + + /** + * Builds the <code>clusterings</code> with the specified EOC. + * + * @return the clusterings + */ + public abstract SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive); + + /** + * Checks if some elements can still be added to the clusterings. + * + * @return <code>true</code> if it is possible to add more elements to the clusterings, <code>false</code> otherwise. + */ + public boolean hasRemaining() + { + return remainingCount() > 0; + } + + + private static class ConcreteMultiCBuilder extends MultiCBuilder + { + /** + * The table comparator. + */ + private final ClusteringComparator comparator; + + /** + * The elements of the clusterings + */ + private final List<List<ByteBuffer>> elementsList = new ArrayList<>(); + + /** + * The number of elements that have been added. + */ + private int size; + + /** + * <code>true</code> if the clusterings have been build, <code>false</code> otherwise. + */ + private boolean built; + + /** + * <code>true</code> if the clusterings contains some <code>null</code> elements. + */ + private boolean containsNull; + + /** + * <code>true</code> if the composites contains some <code>unset</code> elements. + */ + private boolean containsUnset; + + /** + * <code>true</code> if some empty collection have been added. + */ + private boolean hasMissingElements; + + public ConcreteMultiCBuilder(ClusteringComparator comparator) + { + this.comparator = comparator; + } + + public MultiCBuilder addElementToAll(ByteBuffer value) + { + checkUpdateable(); + + if (isEmpty()) + elementsList.add(new ArrayList<ByteBuffer>()); + + for (int i = 0, m = elementsList.size(); i < m; i++) + { + if (value == null) + containsNull = true; + if (value == ByteBufferUtil.UNSET_BYTE_BUFFER) + containsUnset = true; + + elementsList.get(i).add(value); + } + size++; + return this; + } + + public MultiCBuilder addEachElementToAll(List<ByteBuffer> values) + { + checkUpdateable(); + + if (isEmpty()) + elementsList.add(new ArrayList<ByteBuffer>()); + + if (values.isEmpty()) + { + hasMissingElements = true; + } + else + { + for (int i = 0, m = elementsList.size(); i < m; i++) + { + List<ByteBuffer> oldComposite = elementsList.remove(0); + + for (int j = 0, n = values.size(); j < n; j++) + { + List<ByteBuffer> newComposite = new ArrayList<>(oldComposite); + elementsList.add(newComposite); + + ByteBuffer value = values.get(j); + + if (value == null) + containsNull = true; + if (value == ByteBufferUtil.UNSET_BYTE_BUFFER) + containsUnset = true; + + newComposite.add(values.get(j)); + } + } + } + size++; + return this; + } + + public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values) + { + checkUpdateable(); + + if (isEmpty()) + elementsList.add(new ArrayList<ByteBuffer>()); + + if (values.isEmpty()) + { + hasMissingElements = true; + } + else + { + for (int i = 0, m = elementsList.size(); i < m; i++) + { + List<ByteBuffer> oldComposite = elementsList.remove(0); + + for (int j = 0, n = values.size(); j < n; j++) + { + List<ByteBuffer> newComposite = new ArrayList<>(oldComposite); + elementsList.add(newComposite); + + List<ByteBuffer> value = values.get(j); + + if (value.isEmpty()) + hasMissingElements = true; + + if (value.contains(null)) + containsNull = true; + if (value.contains(ByteBufferUtil.UNSET_BYTE_BUFFER)) + containsUnset = true; + + newComposite.addAll(value); + } + } + size += values.get(0).size(); + } + return this; + } + + public int remainingCount() + { + return comparator.size() - size; + } + + /** + * Checks if this builder is empty. + * + * @return <code>true</code> if this builder is empty, <code>false</code> otherwise. + */ + public boolean isEmpty() + { + return elementsList.isEmpty(); + } + + public boolean containsNull() + { + return containsNull; + } + + public boolean containsUnset() + { + return containsUnset; + } + + public boolean hasMissingElements() + { + return hasMissingElements; + } + + public NavigableSet<Clustering> build() + { + built = true; + + if (hasMissingElements) + return FBUtilities.emptySortedSet(comparator); + + CBuilder builder = CBuilder.create(comparator); + + if (elementsList.isEmpty()) + return FBUtilities.singleton(builder.build(), builder.comparator()); + + // Use a TreeSet to sort and eliminate duplicates + NavigableSet<Clustering> set = new TreeSet<>(builder.comparator()); + + for (int i = 0, m = elementsList.size(); i < m; i++) + { + List<ByteBuffer> elements = elementsList.get(i); + set.add(builder.buildWith(elements)); + } + return set; + } + + public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive) + { + built = true; + + if (hasMissingElements) + return FBUtilities.emptySortedSet(comparator); + + CBuilder builder = CBuilder.create(comparator); + + if (elementsList.isEmpty()) + return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), comparator); + + // Use a TreeSet to sort and eliminate duplicates + SortedSet<Slice.Bound> set = new TreeSet<>(comparator); + + for (int i = 0, m = elementsList.size(); i < m; i++) + { + List<ByteBuffer> elements = elementsList.get(i); + set.add(builder.buildBoundWith(elements, isStart, isInclusive)); + } + return set; + } + + private void checkUpdateable() + { + if (!hasRemaining() || built) + throw new IllegalStateException("this builder cannot be updated anymore"); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 9dd1686..355d259 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -19,7 +19,6 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; import org.apache.commons.lang3.StringUtils; @@ -27,13 +26,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,37 +52,27 @@ public class Mutation implements IMutation // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test private final String keyspaceName; - private final ByteBuffer key; + private final DecoratedKey key; // map of column family id to mutations for that column family. - private final Map<UUID, ColumnFamily> modifications; - - public Mutation(String keyspaceName, ByteBuffer key) - { - this(keyspaceName, key, new HashMap<UUID, ColumnFamily>()); - } + private final Map<UUID, PartitionUpdate> modifications; - public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf) + public Mutation(String keyspaceName, DecoratedKey key) { - this(keyspaceName, key, Collections.singletonMap(cf.id(), cf)); + this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>()); } - public Mutation(String keyspaceName, Row row) + public Mutation(PartitionUpdate update) { - this(keyspaceName, row.key.getKey(), row.cf); + this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update)); } - protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications) + protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications) { this.keyspaceName = keyspaceName; this.key = key; this.modifications = modifications; } - public Mutation(ByteBuffer key, ColumnFamily cf) - { - this(cf.metadata().ksName, key, cf); - } - public Mutation copy() { Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications)); @@ -98,53 +89,34 @@ public class Mutation implements IMutation return modifications.keySet(); } - public ByteBuffer key() + public DecoratedKey key() { return key; } - public Collection<ColumnFamily> getColumnFamilies() + public Collection<PartitionUpdate> getPartitionUpdates() { return modifications.values(); } - public ColumnFamily getColumnFamily(UUID cfId) + public PartitionUpdate getPartitionUpdate(UUID cfId) { return modifications.get(cfId); } - /* - * Specify a column family name and the corresponding column - * family object. - * param @ cf - column family name - * param @ columnFamily - the column family. - */ - public void add(ColumnFamily columnFamily) + public Mutation add(PartitionUpdate update) { - assert columnFamily != null; - ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily); + assert update != null; + PartitionUpdate prev = modifications.put(update.metadata().cfId, update); if (prev != null) // developer error - throw new IllegalArgumentException("Table " + columnFamily + " already has modifications in this mutation: " + prev); - } - - /** - * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary. - */ - public ColumnFamily addOrGet(String cfName) - { - return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName)); + throw new IllegalArgumentException("Table " + update.metadata().cfName + " already has modifications in this mutation: " + prev); + return this; } - public ColumnFamily addOrGet(CFMetaData cfm) + public PartitionUpdate get(CFMetaData cfm) { - ColumnFamily cf = modifications.get(cfm.cfId); - if (cf == null) - { - cf = ArrayBackedSortedColumns.factory.create(cfm); - modifications.put(cfm.cfId, cf); - } - return cf; + return modifications.get(cfm.cfId); } public boolean isEmpty() @@ -152,56 +124,56 @@ public class Mutation implements IMutation return modifications.isEmpty(); } - public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive) - { - addOrGet(cfName).addColumn(name, value, timestamp, timeToLive); - } - - public void addCounter(String cfName, CellName name, long value) - { - addOrGet(cfName).addCounter(name, value); - } - - public void add(String cfName, CellName name, ByteBuffer value, long timestamp) - { - add(cfName, name, value, timestamp, 0); - } - - public void delete(String cfName, long timestamp) + /** + * Creates a new mutation that merges all the provided mutations. + * + * @param mutations the mutations to merge together. All mutation must be + * on the same keyspace and partition key. There should also be at least one + * mutation. + * @return a mutation that contains all the modifications contained in {@code mutations}. + * + * @throws IllegalArgumentException if not all the mutations are on the same + * keyspace and key. + */ + public static Mutation merge(List<Mutation> mutations) { - int localDeleteTime = (int) (System.currentTimeMillis() / 1000); - addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime)); - } + assert !mutations.isEmpty(); - public void delete(String cfName, CellName name, long timestamp) - { - int localDeleteTime = (int) (System.currentTimeMillis() / 1000); - addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp); - } + if (mutations.size() == 1) + return mutations.get(0); - public void deleteRange(String cfName, Composite start, Composite end, long timestamp) - { - int localDeleteTime = (int) (System.currentTimeMillis() / 1000); - addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime)); - } + Set<UUID> updatedTables = new HashSet<>(); + String ks = null; + DecoratedKey key = null; + for (Mutation mutation : mutations) + { + updatedTables.addAll(mutation.modifications.keySet()); + if (ks != null && !ks.equals(mutation.keyspaceName)) + throw new IllegalArgumentException(); + if (key != null && !key.equals(mutation.key)) + throw new IllegalArgumentException(); + ks = mutation.keyspaceName; + key = mutation.key; + } - public void addAll(IMutation m) - { - if (!(m instanceof Mutation)) - throw new IllegalArgumentException(); + List<PartitionUpdate> updates = new ArrayList<>(mutations.size()); + Map<UUID, PartitionUpdate> modifications = new HashMap<>(updatedTables.size()); + for (UUID table : updatedTables) + { + for (Mutation mutation : mutations) + { + PartitionUpdate upd = mutation.modifications.get(table); + if (upd != null) + updates.add(upd); + } - Mutation mutation = (Mutation)m; - if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key)) - throw new IllegalArgumentException(); + if (updates.isEmpty()) + continue; - for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet()) - { - // It's slighty faster to assume the key wasn't present and fix if - // not in the case where it wasn't there indeed. - ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue()); - if (cf != null) - entry.getValue().addAll(cf); + modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates)); + updates.clear(); } + return new Mutation(ks, key, modifications); } /* @@ -243,7 +215,7 @@ public class Mutation implements IMutation { StringBuilder buff = new StringBuilder("Mutation("); buff.append("keyspace='").append(keyspaceName).append('\''); - buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\''); + buff.append(", key='").append(ByteBufferUtil.bytesToHex(key.getKey())).append('\''); buff.append(", modifications=["); if (shallow) { @@ -256,14 +228,16 @@ public class Mutation implements IMutation buff.append(StringUtils.join(cfnames, ", ")); } else - buff.append(StringUtils.join(modifications.values(), ", ")); + { + buff.append("\n ").append(StringUtils.join(modifications.values(), "\n ")).append("\n"); + } return buff.append("])").toString(); } public Mutation without(UUID cfId) { Mutation mutation = new Mutation(keyspaceName, key); - for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet()) + for (Map.Entry<UUID, PartitionUpdate> entry : modifications.entrySet()) if (!entry.getKey().equals(cfId)) mutation.add(entry.getValue()); return mutation; @@ -276,58 +250,52 @@ public class Mutation implements IMutation if (version < MessagingService.VERSION_20) out.writeUTF(mutation.getKeyspaceName()); - ByteBufferUtil.writeWithShortLength(mutation.key(), out); + if (version < MessagingService.VERSION_30) + ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out); /* serialize the modifications in the mutation */ int size = mutation.modifications.size(); out.writeInt(size); assert size > 0; - for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet()) - ColumnFamily.serializer.serialize(entry.getValue(), out, version); + for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet()) + PartitionUpdate.serializer.serialize(entry.getValue(), out, version); } - public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException + public Mutation deserialize(DataInput in, int version, SerializationHelper.Flag flag) throws IOException { String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that if (version < MessagingService.VERSION_20) keyspaceName = in.readUTF(); - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + DecoratedKey key = null; + if (version < MessagingService.VERSION_30) + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + int size = in.readInt(); assert size > 0; - Map<UUID, ColumnFamily> modifications; if (size == 1) + return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key)); + + Map<UUID, PartitionUpdate> modifications = new HashMap<>(size); + PartitionUpdate update = null; + for (int i = 0; i < size; ++i) { - ColumnFamily cf = deserializeOneCf(in, version, flag); - modifications = Collections.singletonMap(cf.id(), cf); - keyspaceName = cf.metadata().ksName; - } - else - { - modifications = new HashMap<UUID, ColumnFamily>(size); - for (int i = 0; i < size; ++i) - { - ColumnFamily cf = deserializeOneCf(in, version, flag); - modifications.put(cf.id(), cf); - keyspaceName = cf.metadata().ksName; - } + update = PartitionUpdate.serializer.deserialize(in, version, flag, key); + modifications.put(update.metadata().cfId, update); } - return new Mutation(keyspaceName, key, modifications); - } + if (keyspaceName == null) + keyspaceName = update.metadata().ksName; + if (key == null) + key = update.partitionKey(); - private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException - { - ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version); - // We don't allow Mutation with null column family, so we should never get null back. - assert cf != null; - return cf; + return new Mutation(keyspaceName, key, modifications); } public Mutation deserialize(DataInput in, int version) throws IOException { - return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE); + return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE); } public long serializedSize(Mutation mutation, int version) @@ -338,12 +306,15 @@ public class Mutation implements IMutation if (version < MessagingService.VERSION_20) size += sizes.sizeof(mutation.getKeyspaceName()); - int keySize = mutation.key().remaining(); - size += sizes.sizeof((short) keySize) + keySize; + if (version < MessagingService.VERSION_30) + { + int keySize = mutation.key().getKey().remaining(); + size += sizes.sizeof((short) keySize) + keySize; + } size += sizes.sizeof(mutation.modifications.size()); - for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet()) - size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version); + for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet()) + size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version, sizes); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeCell.java b/src/java/org/apache/cassandra/db/NativeCell.java deleted file mode 100644 index dac5674..0000000 --- a/src/java/org/apache/cassandra/db/NativeCell.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativeAllocator; - -public class NativeCell extends AbstractNativeCell -{ - private static final long SIZE = ObjectSizes.measure(new NativeCell()); - - NativeCell() - {} - - public NativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf) - { - super(allocator, writeOp, copyOf); - } - - @Override - public CellName name() - { - return this; - } - - @Override - public long timestamp() - { - return getLong(TIMESTAMP_OFFSET); - } - - @Override - public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferCell(copy(metadata, allocator), allocator.clone(value()), timestamp()); - } - - @Override - public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public void updateDigest(MessageDigest digest) - { - updateWithName(digest); // name - updateWithValue(digest); // value - - FBUtilities.updateWithLong(digest, timestamp()); - FBUtilities.updateWithByte(digest, serializationFlags()); - } - - @Override - public long unsharedHeapSizeExcludingData() - { - return SIZE; - } - - @Override - public long unsharedHeapSize() - { - return SIZE; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCounterCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeCounterCell.java b/src/java/org/apache/cassandra/db/NativeCounterCell.java deleted file mode 100644 index c16cc44..0000000 --- a/src/java/org/apache/cassandra/db/NativeCounterCell.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativeAllocator; - -public class NativeCounterCell extends NativeCell implements CounterCell -{ - private static final long SIZE = ObjectSizes.measure(new NativeCounterCell()); - - private NativeCounterCell() - {} - - public NativeCounterCell(NativeAllocator allocator, OpOrder.Group writeOp, CounterCell copyOf) - { - super(allocator, writeOp, copyOf); - } - - @Override - protected void construct(Cell from) - { - super.construct(from); - setLong(internalSize() - 8, ((CounterCell) from).timestampOfLastDelete()); - } - - @Override - protected int postfixSize() - { - return 8; - } - - @Override - protected int sizeOf(Cell cell) - { - return 8 + super.sizeOf(cell); - } - - @Override - public long timestampOfLastDelete() - { - return getLong(internalSize() - 8); - } - - @Override - public long total() - { - return contextManager.total(value()); - } - - @Override - public boolean hasLegacyShards() - { - return contextManager.hasLegacyShards(value()); - } - - @Override - public Cell markLocalToBeCleared() - { - throw new UnsupportedOperationException(); - } - - @Override - public Cell diff(Cell cell) - { - return diffCounter(cell); - } - - @Override - public Cell reconcile(Cell cell) - { - return reconcileCounter(cell); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.COUNTER_MASK; - } - - @Override - public int cellDataSize() - { - // A counter column adds 8 bytes for timestampOfLastDelete to Cell. - return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete()); - } - - @Override - public int serializedSize(CellNameType type, TypeSizes typeSizes) - { - return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete()); - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - validateName(metadata); - // We cannot use the value validator as for other columns as the CounterColumnType validate a long, - // which is not the internal representation of counters - contextManager.validateContext(value()); - } - - /* - * We have to special case digest creation for counter column because - * we don't want to include the information about which shard of the - * context is a delta or not, since this information differs from node to - * node. - */ - @Override - public void updateDigest(MessageDigest digest) - { - updateWithName(digest); - - // We don't take the deltas into account in a digest - contextManager.updateDigest(digest, value()); - - FBUtilities.updateWithLong(digest, timestamp()); - FBUtilities.updateWithByte(digest, serializationFlags()); - FBUtilities.updateWithLong(digest, timestampOfLastDelete()); - } - - @Override - public String getString(CellNameType comparator) - { - return String.format("%s(%s:false:%s@%d!%d)", - getClass().getSimpleName(), - comparator.getString(name()), - contextManager.toString(value()), - timestamp(), - timestampOfLastDelete()); - } - - @Override - public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferCounterCell(copy(metadata, allocator), allocator.clone(value()), timestamp(), timestampOfLastDelete()); - } - - @Override - public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public long unsharedHeapSizeExcludingData() - { - return SIZE; - } - - @Override - public long unsharedHeapSize() - { - return SIZE; - } - - @Override - public boolean equals(Cell cell) - { - return super.equals(cell) && timestampOfLastDelete() == ((CounterCell) cell).timestampOfLastDelete(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeDeletedCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeDeletedCell.java b/src/java/org/apache/cassandra/db/NativeDeletedCell.java deleted file mode 100644 index 6bdef43..0000000 --- a/src/java/org/apache/cassandra/db/NativeDeletedCell.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemoryUtil; -import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativeAllocator; - -public class NativeDeletedCell extends NativeCell implements DeletedCell -{ - private static final long SIZE = ObjectSizes.measure(new NativeDeletedCell()); - - private NativeDeletedCell() - {} - - public NativeDeletedCell(NativeAllocator allocator, OpOrder.Group writeOp, DeletedCell copyOf) - { - super(allocator, writeOp, copyOf); - } - - @Override - public Cell reconcile(Cell cell) - { - if (cell instanceof DeletedCell) - return super.reconcile(cell); - return cell.reconcile(this); - } - - @Override - public boolean isLive() - { - return false; - } - - @Override - public boolean isLive(long now) - { - return false; - } - - @Override - public int getLocalDeletionTime() - { - int v = getInt(valueStartOffset()); - return MemoryUtil.INVERTED_ORDER ? Integer.reverseBytes(v) : v; - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.DELETION_MASK; - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - validateName(metadata); - - if ((int) (internalSize() - valueStartOffset()) != 4) - throw new MarshalException("A tombstone value should be 4 bytes long"); - if (getLocalDeletionTime() < 0) - throw new MarshalException("The local deletion time should not be negative"); - } - - @Override - public void updateDigest(MessageDigest digest) - { - updateWithName(digest); - FBUtilities.updateWithLong(digest, timestamp()); - FBUtilities.updateWithByte(digest, serializationFlags()); - } - - @Override - public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferDeletedCell(copy(metadata, allocator), allocator.clone(value()), timestamp()); - } - - @Override - public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public long unsharedHeapSizeExcludingData() - { - return SIZE; - } - - @Override - public long unsharedHeapSize() - { - return SIZE; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeExpiringCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeExpiringCell.java b/src/java/org/apache/cassandra/db/NativeExpiringCell.java deleted file mode 100644 index 6369536..0000000 --- a/src/java/org/apache/cassandra/db/NativeExpiringCell.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.utils.memory.AbstractAllocator; -import org.apache.cassandra.utils.memory.MemtableAllocator; -import org.apache.cassandra.utils.memory.NativeAllocator; - -public class NativeExpiringCell extends NativeCell implements ExpiringCell -{ - private static final long SIZE = ObjectSizes.measure(new NativeExpiringCell()); - - private NativeExpiringCell() - {} - - public NativeExpiringCell(NativeAllocator allocator, OpOrder.Group writeOp, ExpiringCell copyOf) - { - super(allocator, writeOp, copyOf); - } - - @Override - protected int sizeOf(Cell cell) - { - return super.sizeOf(cell) + 8; - } - - @Override - protected void construct(Cell from) - { - ExpiringCell expiring = (ExpiringCell) from; - - setInt(internalSize() - 4, expiring.getTimeToLive()); - setInt(internalSize() - 8, expiring.getLocalDeletionTime()); - super.construct(from); - } - - @Override - protected int postfixSize() - { - return 8; - } - - @Override - public int getTimeToLive() - { - return getInt(internalSize() - 4); - } - - @Override - public int getLocalDeletionTime() - { - return getInt(internalSize() - 8); - } - - @Override - public boolean isLive() - { - return isLive(System.currentTimeMillis()); - } - - @Override - public boolean isLive(long now) - { - return (int) (now / 1000) < getLocalDeletionTime(); - } - - @Override - public int serializationFlags() - { - return ColumnSerializer.EXPIRATION_MASK; - } - - @Override - public int cellDataSize() - { - return super.cellDataSize() + TypeSizes.NATIVE.sizeof(getLocalDeletionTime()) + TypeSizes.NATIVE.sizeof(getTimeToLive()); - } - - @Override - public int serializedSize(CellNameType type, TypeSizes typeSizes) - { - /* - * An expired column adds to a Cell : - * 4 bytes for the localExpirationTime - * + 4 bytes for the timeToLive - */ - return super.serializedSize(type, typeSizes) + typeSizes.sizeof(getLocalDeletionTime()) + typeSizes.sizeof(getTimeToLive()); - } - - @Override - public void validateFields(CFMetaData metadata) throws MarshalException - { - super.validateFields(metadata); - - if (getTimeToLive() <= 0) - throw new MarshalException("A column TTL should be > 0"); - if (getLocalDeletionTime() < 0) - throw new MarshalException("The local expiration time should not be negative"); - } - - @Override - public void updateDigest(MessageDigest digest) - { - super.updateDigest(digest); - FBUtilities.updateWithInt(digest, getTimeToLive()); - } - - @Override - public Cell reconcile(Cell cell) - { - long ts1 = timestamp(), ts2 = cell.timestamp(); - if (ts1 != ts2) - return ts1 < ts2 ? cell : this; - // we should prefer tombstones - if (cell instanceof DeletedCell) - return cell; - int c = value().compareTo(cell.value()); - if (c != 0) - return c < 0 ? cell : this; - // If we have same timestamp and value, prefer the longest ttl - if (cell instanceof ExpiringCell) - { - int let1 = getLocalDeletionTime(), let2 = cell.getLocalDeletionTime(); - if (let1 < let2) - return cell; - } - return this; - } - - public boolean equals(Cell cell) - { - if (!super.equals(cell)) - return false; - ExpiringCell that = (ExpiringCell) cell; - return getLocalDeletionTime() == that.getLocalDeletionTime() && getTimeToLive() == that.getTimeToLive(); - } - - @Override - public String getString(CellNameType comparator) - { - return String.format("%s(%s!%d)", getClass().getSimpleName(), super.getString(comparator), getTimeToLive()); - } - - @Override - public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator) - { - return new BufferExpiringCell(name().copy(metadata, allocator), allocator.clone(value()), timestamp(), getTimeToLive(), getLocalDeletionTime()); - } - - @Override - public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup) - { - return allocator.clone(this, metadata, opGroup); - } - - @Override - public long unsharedHeapSizeExcludingData() - { - return SIZE; - } - - @Override - public long unsharedHeapSize() - { - return SIZE; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/OnDiskAtom.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java deleted file mode 100644 index f5eddb9..0000000 --- a/src/java/org/apache/cassandra/db/OnDiskAtom.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.*; -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.ISSTableSerializer; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.MarshalException; - -public interface OnDiskAtom -{ - public Composite name(); - - /** - * For a standard column, this is the same as timestamp(). - * For a super column, this is the min/max column timestamp of the sub columns. - */ - public long timestamp(); - public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity - - public void validateFields(CFMetaData metadata) throws MarshalException; - public void updateDigest(MessageDigest digest); - - public static class Serializer implements ISSTableSerializer<OnDiskAtom> - { - private final CellNameType type; - - public Serializer(CellNameType type) - { - this.type = type; - } - - public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException - { - if (atom instanceof Cell) - { - type.columnSerializer().serialize((Cell)atom, out); - } - else - { - assert atom instanceof RangeTombstone; - type.rangeTombstoneSerializer().serializeForSSTable((RangeTombstone)atom, out); - } - } - - public OnDiskAtom deserializeFromSSTable(DataInput in, Version version) throws IOException - { - return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version); - } - - public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) throws IOException - { - Composite name = type.serializer().deserialize(in); - if (name.isEmpty()) - { - // SSTableWriter.END_OF_ROW - return null; - } - - int b = in.readUnsignedByte(); - if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0) - return type.rangeTombstoneSerializer().deserializeBody(in, name, version); - else - return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore); - } - - public long serializedSizeForSSTable(OnDiskAtom atom) - { - if (atom instanceof Cell) - { - return type.columnSerializer().serializedSize((Cell)atom, TypeSizes.NATIVE); - } - else - { - assert atom instanceof RangeTombstone; - return type.rangeTombstoneSerializer().serializedSizeForSSTable((RangeTombstone)atom); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PagedRangeCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java deleted file mode 100644 index 40ef88e..0000000 --- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; - -public class PagedRangeCommand extends AbstractRangeCommand -{ - public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer(); - - public final Composite start; - public final Composite stop; - public final int limit; - private final boolean countCQL3Rows; - - public PagedRangeCommand(String keyspace, - String columnFamily, - long timestamp, - AbstractBounds<RowPosition> keyRange, - SliceQueryFilter predicate, - Composite start, - Composite stop, - List<IndexExpression> rowFilter, - int limit, - boolean countCQL3Rows) - { - super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter); - this.start = start; - this.stop = stop; - this.limit = limit; - this.countCQL3Rows = countCQL3Rows; - } - - public MessageOut<PagedRangeCommand> createMessage() - { - return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, serializer); - } - - public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange) - { - Composite newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start(); - Composite newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish(); - return new PagedRangeCommand(keyspace, - columnFamily, - timestamp, - subRange, - ((SliceQueryFilter) predicate).cloneShallow(), - newStart, - newStop, - rowFilter, - limit, - countCQL3Rows); - } - - public AbstractRangeCommand withUpdatedLimit(int newLimit) - { - return new PagedRangeCommand(keyspace, - columnFamily, - timestamp, - keyRange, - ((SliceQueryFilter) predicate).cloneShallow(), - start, - stop, - rowFilter, - newLimit, - countCQL3Rows); - } - - public int limit() - { - return limit; - } - - public boolean countCQL3Rows() - { - return countCQL3Rows; - } - - public List<Row> executeLocally() - { - ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); - - ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, countCQL3Rows(), timestamp); - if (cfs.indexManager.hasIndexFor(rowFilter)) - return cfs.search(exFilter); - else - return cfs.getRangeSlice(exFilter); - } - - @Override - public String toString() - { - return String.format("PagedRange(%s, %s, %d, %s, %s, %s, %s, %s, %d)", keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit); - } - - private static class Serializer implements IVersionedSerializer<PagedRangeCommand> - { - public void serialize(PagedRangeCommand cmd, DataOutputPlus out, int version) throws IOException - { - out.writeUTF(cmd.keyspace); - out.writeUTF(cmd.columnFamily); - out.writeLong(cmd.timestamp); - - MessagingService.validatePartitioner(cmd.keyRange); - AbstractBounds.rowPositionSerializer.serialize(cmd.keyRange, out, version); - - CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily); - - // SliceQueryFilter (the count is not used) - SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate; - metadata.comparator.sliceQueryFilterSerializer().serialize(filter, out, version); - - // The start and stop of the page - metadata.comparator.serializer().serialize(cmd.start, out); - metadata.comparator.serializer().serialize(cmd.stop, out); - - out.writeInt(cmd.rowFilter.size()); - for (IndexExpression expr : cmd.rowFilter) - { - expr.writeTo(out);; - } - - out.writeInt(cmd.limit); - if (version >= MessagingService.VERSION_21) - out.writeBoolean(cmd.countCQL3Rows); - } - - public PagedRangeCommand deserialize(DataInput in, int version) throws IOException - { - String keyspace = in.readUTF(); - String columnFamily = in.readUTF(); - long timestamp = in.readLong(); - - AbstractBounds<RowPosition> keyRange = - AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); - if (metadata == null) - { - String message = String.format("Got paged range command for nonexistent table %s.%s. If the table was just " + - "created, this is likely due to the schema not being fully propagated. Please wait for schema " + - "agreement on table creation." , keyspace, columnFamily); - throw new UnknownColumnFamilyException(message, null); - } - - SliceQueryFilter predicate = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version); - - Composite start = metadata.comparator.serializer().deserialize(in); - Composite stop = metadata.comparator.serializer().deserialize(in); - - int filterCount = in.readInt(); - List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount); - for (int i = 0; i < filterCount; i++) - { - rowFilter.add(IndexExpression.readFrom(in)); - } - - int limit = in.readInt(); - boolean countCQL3Rows = version >= MessagingService.VERSION_21 - ? in.readBoolean() - : predicate.compositesToGroup >= 0 || predicate.count != 1; // See #6857 - return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit, countCQL3Rows); - } - - public long serializedSize(PagedRangeCommand cmd, int version) - { - long size = 0; - - size += TypeSizes.NATIVE.sizeof(cmd.keyspace); - size += TypeSizes.NATIVE.sizeof(cmd.columnFamily); - size += TypeSizes.NATIVE.sizeof(cmd.timestamp); - - size += AbstractBounds.rowPositionSerializer.serializedSize(cmd.keyRange, version); - - CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily); - - size += metadata.comparator.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)cmd.predicate, version); - - size += metadata.comparator.serializer().serializedSize(cmd.start, TypeSizes.NATIVE); - size += metadata.comparator.serializer().serializedSize(cmd.stop, TypeSizes.NATIVE); - - size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size()); - for (IndexExpression expr : cmd.rowFilter) - { - size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column); - size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal()); - size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value); - } - - size += TypeSizes.NATIVE.sizeof(cmd.limit); - if (version >= MessagingService.VERSION_21) - size += TypeSizes.NATIVE.sizeof(cmd.countCQL3Rows); - return size; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java new file mode 100644 index 0000000..a1b1d00 --- /dev/null +++ b/src/java/org/apache/cassandra/db/PartitionColumns.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.*; +import java.security.MessageDigest; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.ColumnDefinition; + +/** + * Columns (or a subset of the columns) that a partition contains. + * This mainly groups both static and regular columns for convenience. + */ +public class PartitionColumns implements Iterable<ColumnDefinition> +{ + public static PartitionColumns NONE = new PartitionColumns(Columns.NONE, Columns.NONE); + + public final Columns statics; + public final Columns regulars; + + public PartitionColumns(Columns statics, Columns regulars) + { + this.statics = statics; + this.regulars = regulars; + } + + public static PartitionColumns of(ColumnDefinition column) + { + return new PartitionColumns(column.isStatic() ? Columns.of(column) : Columns.NONE, + column.isStatic() ? Columns.NONE : Columns.of(column)); + } + + public PartitionColumns without(ColumnDefinition column) + { + return new PartitionColumns(column.isStatic() ? statics.without(column) : statics, + column.isStatic() ? regulars : regulars.without(column)); + } + + public PartitionColumns withoutStatics() + { + return statics.isEmpty() ? this : new PartitionColumns(Columns.NONE, regulars); + } + + public boolean isEmpty() + { + return statics.isEmpty() && regulars.isEmpty(); + } + + public boolean contains(ColumnDefinition column) + { + return column.isStatic() ? statics.contains(column) : regulars.contains(column); + } + + public boolean includes(PartitionColumns columns) + { + return statics.contains(columns.statics) && regulars.contains(columns.regulars); + } + + public Iterator<ColumnDefinition> iterator() + { + return Iterators.concat(statics.iterator(), regulars.iterator()); + } + + public Iterator<ColumnDefinition> selectOrderIterator() + { + return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator()); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("[").append(statics).append(" | ").append(regulars).append("]"); + return sb.toString(); + } + + @Override + public boolean equals(Object other) + { + if (!(other instanceof PartitionColumns)) + return false; + + PartitionColumns that = (PartitionColumns)other; + return this.statics.equals(that.statics) + && this.regulars.equals(that.regulars); + } + + @Override + public int hashCode() + { + return Objects.hash(statics, regulars); + } + + public void digest(MessageDigest digest) + { + regulars.digest(digest); + statics.digest(digest); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + // Note that we do want to use sorted sets because we want the column definitions to be compared + // through compareTo, not equals. The former basically check it's the same column name, while the latter + // check it's the same object, including the same type. + private SortedSet<ColumnDefinition> regularColumns; + private SortedSet<ColumnDefinition> staticColumns; + + public Builder add(ColumnDefinition c) + { + if (c.isStatic()) + { + if (staticColumns == null) + staticColumns = new TreeSet<>(); + staticColumns.add(c); + } + else + { + assert c.isRegular(); + if (regularColumns == null) + regularColumns = new TreeSet<>(); + regularColumns.add(c); + } + return this; + } + + public int added() + { + return (regularColumns == null ? 0 : regularColumns.size()) + + (staticColumns == null ? 0 : staticColumns.size()); + } + + public Builder addAll(Iterable<ColumnDefinition> columns) + { + for (ColumnDefinition c : columns) + add(c); + return this; + } + + public Builder addAll(PartitionColumns columns) + { + if (regularColumns == null && !columns.regulars.isEmpty()) + regularColumns = new TreeSet<>(); + + for (ColumnDefinition c : columns.regulars) + regularColumns.add(c); + + if (staticColumns == null && !columns.statics.isEmpty()) + staticColumns = new TreeSet<>(); + + for (ColumnDefinition c : columns.statics) + staticColumns.add(c); + + return this; + } + + public PartitionColumns build() + { + return new PartitionColumns(staticColumns == null ? Columns.NONE : Columns.from(staticColumns), + regularColumns == null ? Columns.NONE : Columns.from(regularColumns)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionPosition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java new file mode 100644 index 0000000..1dc940e --- /dev/null +++ b/src/java/org/apache/cassandra/db/PartitionPosition.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; + +public interface PartitionPosition extends RingPosition<PartitionPosition> +{ + public static enum Kind + { + // Only add new values to the end of the enum, the ordinal is used + // during serialization + ROW_KEY, MIN_BOUND, MAX_BOUND; + + private static final Kind[] allKinds = Kind.values(); + + static Kind fromOrdinal(int ordinal) + { + return allKinds[ordinal]; + } + } + + public static final class ForKey + { + public static PartitionPosition get(ByteBuffer key, IPartitioner p) + { + return key == null || key.remaining() == 0 ? p.getMinimumToken().minKeyBound() : p.decorateKey(key); + } + } + + public static final RowPositionSerializer serializer = new RowPositionSerializer(); + + public Kind kind(); + public boolean isMinimum(); + + public static class RowPositionSerializer implements IPartitionerDependentSerializer<PartitionPosition> + { + /* + * We need to be able to serialize both Token.KeyBound and + * DecoratedKey. To make this compact, we first write a byte whose + * meaning is: + * - 0: DecoratedKey + * - 1: a 'minimum' Token.KeyBound + * - 2: a 'maximum' Token.KeyBound + * In the case of the DecoratedKey, we then serialize the key (the + * token is recreated on the other side). In the other cases, we then + * serialize the token. + */ + public void serialize(PartitionPosition pos, DataOutputPlus out, int version) throws IOException + { + Kind kind = pos.kind(); + out.writeByte(kind.ordinal()); + if (kind == Kind.ROW_KEY) + ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out); + else + Token.serializer.serialize(pos.getToken(), out, version); + } + + public PartitionPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException + { + Kind kind = Kind.fromOrdinal(in.readByte()); + if (kind == Kind.ROW_KEY) + { + ByteBuffer k = ByteBufferUtil.readWithShortLength(in); + return StorageService.getPartitioner().decorateKey(k); + } + else + { + Token t = Token.serializer.deserialize(in, p, version); + return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound(); + } + } + + public long serializedSize(PartitionPosition pos, int version) + { + Kind kind = pos.kind(); + int size = 1; // 1 byte for enum + if (kind == Kind.ROW_KEY) + { + int keySize = ((DecoratedKey)pos).getKey().remaining(); + size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize; + } + else + { + size += Token.serializer.serializedSize(pos.getToken(), version); + } + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java new file mode 100644 index 0000000..c11a9be --- /dev/null +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.index.SecondaryIndexSearcher; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.metrics.ColumnFamilyMetrics; +import org.apache.cassandra.service.*; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.thrift.ThriftResultsMerger; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A read command that selects a (part of a) range of partitions. + */ +public class PartitionRangeReadCommand extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DataRange dataRange; + + public PartitionRangeReadCommand(boolean isDigest, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) + { + super(Kind.PARTITION_RANGE, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + this.dataRange = dataRange; + } + + public PartitionRangeReadCommand(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) + { + this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange); + } + + /** + * Creates a new read command that query all the data in the table. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * + * @return a newly created read command that queries everything in the table. + */ + public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec) + { + return new PartitionRangeReadCommand(metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + DataRange.allData(StorageService.getPartitioner())); + } + + public DataRange dataRange() + { + return dataRange; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return dataRange.clusteringIndexFilter(key); + } + + public boolean isNamesQuery() + { + return dataRange.isNamesQuery(); + } + + public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) + { + return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range)); + } + + public PartitionRangeReadCommand copy() + { + return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange()); + } + + public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) + { + return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange()); + } + + public long getTimeout() + { + return DatabaseDescriptor.getRangeRpcTimeout(); + } + + public boolean selects(DecoratedKey partitionKey, Clustering clustering) + { + if (!dataRange().contains(partitionKey)) + return false; + + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + return dataRange().clusteringIndexFilter(partitionKey).selects(clustering); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.getRangeSlice(this, consistency); + } + + public QueryPager getPager(PagingState pagingState) + { + if (isNamesQuery()) + return new RangeNamesQueryPager(this, pagingState); + else + return new RangeSliceQueryPager(this, pagingState); + } + + protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos) + { + metric.rangeLatency.addNano(latencyNanos); + } + + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(dataRange().keyRange())); + Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); + + // fetch data from current memtable, historical memtables, and SSTables in the correct order. + final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size()); + + try + { + for (Memtable memtable : view.memtables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + UnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift()); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + } + + for (SSTableReader sstable : view.sstables) + { + @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method + UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift()); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter); + } + + return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs); + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + + throw e; + } + } + + private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs) + { + return new WrappingUnfilteredPartitionIterator(iter) + { + @Override + public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done + // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage. + DecoratedKey dk = iter.partitionKey(); + + // Check if this partition is in the rowCache and if it is, if it covers our filter + CachedPartition cached = cfs.getRawCachedPartition(dk); + ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk); + + if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec())) + { + // We won't use 'iter' so close it now. + iter.close(); + + return filter.getUnfilteredRowIterator(columnFilter(), cached); + } + + return iter; + } + }; + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + if (dataRange.isUnrestricted() && rowFilter().isEmpty()) + return; + + sb.append(" WHERE "); + // We put the row filter first because the data range can end by "ORDER BY" + if (!rowFilter().isEmpty()) + { + sb.append(rowFilter()); + if (!dataRange.isUnrestricted()) + sb.append(" AND "); + } + if (!dataRange.isUnrestricted()) + sb.append(dataRange.toCQLString(metadata())); + } + + /** + * Allow to post-process the result of the query after it has been reconciled on the coordinator + * but before it is passed to the CQL layer to return the ResultSet. + * + * See CASSANDRA-8717 for why this exists. + */ + public PartitionIterator postReconciliationProcessing(PartitionIterator result) + { + ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName); + SecondaryIndexSearcher searcher = getIndexSearcher(cfs); + return searcher == null ? result : searcher.postReconciliationProcessing(rowFilter(), result); + } + + @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + dataRange().toString(metadata())); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + DataRange.serializer.serialize(dataRange(), out, version, metadata()); + } + + protected long selectionSerializedSize(int version) + { + return DataRange.serializer.serializedSize(dataRange(), version, metadata()); + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + throws IOException + { + DataRange range = DataRange.serializer.deserialize(in, version, metadata); + return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range); + } + }; +}
