http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java index 541556b..7754182 100644 --- a/src/java/org/apache/cassandra/db/Clustering.java +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; @@ -25,7 +24,9 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** * The clustering column values for a row. @@ -39,7 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; * all of the following ones will be too because that's what thrift allows, but it's never assumed by the * code so we could start generally allowing nulls for clustering columns if we wanted to). */ -public abstract class Clustering extends AbstractClusteringPrefix +public class Clustering extends AbstractClusteringPrefix { public static final Serializer serializer = new Serializer(); @@ -47,7 +48,7 @@ public abstract class Clustering extends AbstractClusteringPrefix * The special cased clustering used by all static rows. It is a special case in the * sense that it's always empty, no matter how many clustering columns the table has. */ - public static final Clustering STATIC_CLUSTERING = new EmptyClustering() + public static final Clustering STATIC_CLUSTERING = new Clustering(EMPTY_VALUES_ARRAY) { @Override public Kind kind() @@ -63,19 +64,35 @@ public abstract class Clustering extends AbstractClusteringPrefix }; /** Empty clustering for tables having no clustering columns. */ - public static final Clustering EMPTY = new EmptyClustering(); + public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY) + { + @Override + public String toString(CFMetaData metadata) + { + return "EMPTY"; + } + }; + + public Clustering(ByteBuffer... values) + { + super(Kind.CLUSTERING, values); + } public Kind kind() { return Kind.CLUSTERING; } - public Clustering takeAlias() + public Clustering copy(AbstractAllocator allocator) { - ByteBuffer[] values = new ByteBuffer[size()]; + // Important for STATIC_CLUSTERING (but no point in being wasteful in general). + if (size() == 0) + return this; + + ByteBuffer[] newValues = new ByteBuffer[size()]; for (int i = 0; i < size(); i++) - values[i] = get(i); - return new SimpleClustering(values); + newValues[i] = values[i] == null ? null : allocator.clone(values[i]); + return new Clustering(newValues); } public String toString(CFMetaData metadata) @@ -84,7 +101,7 @@ public abstract class Clustering extends AbstractClusteringPrefix for (int i = 0; i < size(); i++) { ColumnDefinition c = metadata.clusteringColumns().get(i); - sb.append(i == 0 ? "" : ", ").append(c.name).append("=").append(get(i) == null ? "null" : c.type.getString(get(i))); + sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i))); } return sb.toString(); } @@ -100,44 +117,6 @@ public abstract class Clustering extends AbstractClusteringPrefix return sb.toString(); } - private static class EmptyClustering extends Clustering - { - private static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0]; - - public int size() - { - return 0; - } - - public ByteBuffer get(int i) - { - throw new UnsupportedOperationException(); - } - - public ByteBuffer[] getRawValues() - { - return EMPTY_VALUES_ARRAY; - } - - @Override - public Clustering takeAlias() - { - return this; - } - - @Override - public long unsharedHeapSize() - { - return 0; - } - - @Override - public String toString(CFMetaData metadata) - { - return "EMPTY"; - } - } - /** * Serializer for Clustering object. * <p> @@ -148,6 +127,7 @@ public abstract class Clustering extends AbstractClusteringPrefix { public void serialize(Clustering clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException { + assert clustering != STATIC_CLUSTERING : "We should never serialize a static clustering"; ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types); } @@ -156,16 +136,13 @@ public abstract class Clustering extends AbstractClusteringPrefix return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types); } - public void deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException + public Clustering deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { - ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types, writer); - } + if (types.isEmpty()) + return EMPTY; - public Clustering deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException - { - SimpleClustering.Builder builder = SimpleClustering.builder(types.size()); - deserialize(in, version, types, builder); - return builder.build(); + ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types); + return new Clustering(values); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ClusteringComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java index 8b01d6f..a5401f0 100644 --- a/src/java/org/apache/cassandra/db/ClusteringComparator.java +++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java @@ -25,7 +25,9 @@ import java.util.Objects; import com.google.common.base.Joiner; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; @@ -46,9 +48,11 @@ public class ClusteringComparator implements Comparator<Clusterable> private final Comparator<IndexInfo> indexReverseComparator; private final Comparator<Clusterable> reverseComparator; + private final Comparator<Row> rowComparator = (r1, r2) -> compare(r1.clustering(), r2.clustering()); + public ClusteringComparator(AbstractType<?>... clusteringTypes) { - this(Arrays.<AbstractType<?>>asList(clusteringTypes)); + this(Arrays.asList(clusteringTypes)); } public ClusteringComparator(List<AbstractType<?>> clusteringTypes) @@ -56,27 +60,9 @@ public class ClusteringComparator implements Comparator<Clusterable> this.clusteringTypes = clusteringTypes; this.isByteOrderComparable = isByteOrderComparable(clusteringTypes); - this.indexComparator = new Comparator<IndexInfo>() - { - public int compare(IndexInfo o1, IndexInfo o2) - { - return ClusteringComparator.this.compare(o1.lastName, o2.lastName); - } - }; - this.indexReverseComparator = new Comparator<IndexInfo>() - { - public int compare(IndexInfo o1, IndexInfo o2) - { - return ClusteringComparator.this.compare(o1.firstName, o2.firstName); - } - }; - this.reverseComparator = new Comparator<Clusterable>() - { - public int compare(Clusterable c1, Clusterable c2) - { - return ClusteringComparator.this.compare(c2, c1); - } - }; + this.indexComparator = (o1, o2) -> ClusteringComparator.this.compare(o1.lastName, o2.lastName); + this.indexReverseComparator = (o1, o2) -> ClusteringComparator.this.compare(o1.firstName, o2.firstName); + this.reverseComparator = (c1, c2) -> ClusteringComparator.this.compare(c2, c1); } private static boolean isByteOrderComparable(Iterable<AbstractType<?>> types) @@ -130,11 +116,10 @@ public class ClusteringComparator implements Comparator<Clusterable> throw new IllegalArgumentException(String.format("Invalid number of components, expecting %d but got %d", size(), values.length)); CBuilder builder = CBuilder.create(this); - for (int i = 0; i < values.length; i++) + for (Object val : values) { - Object val = values[i]; if (val instanceof ByteBuffer) - builder.add((ByteBuffer)val); + builder.add((ByteBuffer) val); else builder.add(val); } @@ -179,7 +164,7 @@ public class ClusteringComparator implements Comparator<Clusterable> public int compareComponent(int i, ByteBuffer v1, ByteBuffer v2) { if (v1 == null) - return v1 == null ? 0 : -1; + return v2 == null ? 0 : -1; if (v2 == null) return 1; @@ -233,6 +218,19 @@ public class ClusteringComparator implements Comparator<Clusterable> } } + /** + * A comparator for rows. + * + * A {@code Row} is a {@code Clusterable} so {@code ClusteringComparator} can be used + * to compare rows directly, but when we know we deal with rows (and not {@code Clusterable} in + * general), this is a little faster because by knowing we compare {@code Clustering} objects, + * we know that 1) they all have the same size and 2) they all have the same kind. + */ + public Comparator<Row> rowComparator() + { + return rowComparator; + } + public Comparator<IndexInfo> indexComparator(boolean reversed) { return reversed ? indexReverseComparator : indexComparator; @@ -243,27 +241,6 @@ public class ClusteringComparator implements Comparator<Clusterable> return reverseComparator; } - /** - * Whether the two provided clustering prefix are on the same clustering values. - * - * @param c1 the first prefix. - * @param c2 the second prefix. - * @return whether {@code c1} and {@code c2} have the same clustering values (but not necessarily - * the same "kind") or not. - */ - public boolean isOnSameClustering(ClusteringPrefix c1, ClusteringPrefix c2) - { - if (c1.size() != c2.size()) - return false; - - for (int i = 0; i < c1.size(); i++) - { - if (compareComponent(i, c1.get(i), c2.get(i)) != 0) - return false; - } - return true; - } - @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 3bc7ff8..7b9d582 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -27,29 +26,31 @@ import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; /** - * A clustering prefix is basically the unit of what a {@link ClusteringComparator} can compare. + * A clustering prefix is the unit of what a {@link ClusteringComparator} can compare. * <p> - * It holds values for the clustering columns of a table (potentially only a prefix of all of them) and it has + * It holds values for the clustering columns of a table (potentially only a prefix of all of them) and has * a "kind" that allows us to implement slices with inclusive and exclusive bounds. * <p> - * In practice, {@code ClusteringPrefix} is just the common parts to its 2 main subtype: {@link Clustering} and - * {@link Slice.Bound}, where: + * In practice, {@code ClusteringPrefix} is just the common parts to its 3 main subtype: {@link Clustering} and + * {@link Slice.Bound}/{@link RangeTombstone.Bound}, where: * 1) {@code Clustering} represents the clustering values for a row, i.e. the values for it's clustering columns. * 2) {@code Slice.Bound} represents a bound (start or end) of a slice (of rows). + * 3) {@code RangeTombstoneBoundMarker.Bound} represents a range tombstone marker "bound". * See those classes for more details. */ -public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurableMemory, Clusterable +public interface ClusteringPrefix extends IMeasurableMemory, Clusterable { public static final Serializer serializer = new Serializer(); /** * The kind of clustering prefix this actually is. * - * The kind {@code STATIC_CLUSTERING} is only implemented by {@link Clustering.STATIC_CLUSTERING} and {@code CLUSTERING} is + * The kind {@code STATIC_CLUSTERING} is only implemented by {@link Clustering#STATIC_CLUSTERING} and {@code CLUSTERING} is * implemented by the {@link Clustering} class. The rest is used by {@link Slice.Bound} and {@link RangeTombstone.Bound}. */ public enum Kind @@ -166,12 +167,12 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab public boolean isOpen(boolean reversed) { - return reversed ? isEnd() : isStart(); + return isBoundary() || (reversed ? isEnd() : isStart()); } public boolean isClose(boolean reversed) { - return reversed ? isStart() : isEnd(); + return isBoundary() || (reversed ? isStart() : isEnd()); } public Kind closeBoundOfBoundary(boolean reversed) @@ -211,15 +212,29 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab */ public ByteBuffer get(int i); + /** + * Adds the data of this clustering prefix to the provided digest. + * + * @param digest the digest to which to add this prefix. + */ public void digest(MessageDigest digest); - // Used to verify if batches goes over a given size + /** + * The size of the data hold by this prefix. + * + * @return the size of the data hold by this prefix (this is not the size of the object in memory, just + * the size of the data it stores). + */ public int dataSize(); + /** + * Generates a proper string representation of the prefix. + * + * @param metadata the metadata for the table the clustering prefix is of. + * @return a human-readable string representation fo this prefix. + */ public String toString(CFMetaData metadata); - public void writeTo(Writer writer); - /** * The values of this prefix as an array. * <p> @@ -231,21 +246,6 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab */ public ByteBuffer[] getRawValues(); - /** - * Interface for writing a clustering prefix. - * <p> - * Each value for the prefix should simply be written in order. - */ - public interface Writer - { - /** - * Write the next value to the writer. - * - * @param value the value to write. - */ - public void writeClusteringValue(ByteBuffer value); - } - public static class Serializer { public void serialize(ClusteringPrefix clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException @@ -263,7 +263,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab } } - public ClusteringPrefix deserialize(DataInput in, int version, List<AbstractType<?>> types) throws IOException + public ClusteringPrefix deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { Kind kind = Kind.values()[in.readByte()]; // We shouldn't serialize static clusterings @@ -317,21 +317,20 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab return size; } - void deserializeValuesWithoutSize(DataInput in, int size, int version, List<AbstractType<?>> types, ClusteringPrefix.Writer writer) throws IOException + ByteBuffer[] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException { - if (size == 0) - return; + // Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway). + assert size > 0; + ByteBuffer[] values = new ByteBuffer[size]; int[] header = readHeader(size, in); for (int i = 0; i < size; i++) { - if (isNull(header, i)) - writer.writeClusteringValue(null); - else if (isEmpty(header, i)) - writer.writeClusteringValue(ByteBufferUtil.EMPTY_BYTE_BUFFER); - else - writer.writeClusteringValue(types.get(i).readValue(in)); + values[i] = isNull(header, i) + ? null + : (isEmpty(header, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(i).readValue(in)); } + return values; } private int headerBytesCount(int size) @@ -369,7 +368,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab } } - private int[] readHeader(int size, DataInput in) throws IOException + private int[] readHeader(int size, DataInputPlus in) throws IOException { int nbBytes = headerBytesCount(size); int[] header = new int[nbBytes]; @@ -378,14 +377,14 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab return header; } - private boolean isNull(int[] header, int i) + private static boolean isNull(int[] header, int i) { int b = header[i / 4]; int mask = 1 << ((i % 4) * 2) + 1; return (b & mask) != 0; } - private boolean isEmpty(int[] header, int i) + private static boolean isEmpty(int[] header, int i) { int b = header[i / 4]; int mask = 1 << ((i % 4) * 2); @@ -405,7 +404,7 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab public static class Deserializer { private final ClusteringComparator comparator; - private final DataInput in; + private final DataInputPlus in; private final SerializationHeader serializationHeader; private boolean nextIsRow; @@ -414,14 +413,13 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab private int nextSize; private ClusteringPrefix.Kind nextKind; private int deserializedSize; - private final ByteBuffer[] nextValues; + private ByteBuffer[] nextValues; - public Deserializer(ClusteringComparator comparator, DataInput in, SerializationHeader header) + public Deserializer(ClusteringComparator comparator, DataInputPlus in, SerializationHeader header) { this.comparator = comparator; this.in = in; this.serializationHeader = header; - this.nextValues = new ByteBuffer[comparator.size()]; } public void prepare(int flags) throws IOException @@ -432,6 +430,14 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort(); this.nextHeader = serializer.readHeader(nextSize, in); this.deserializedSize = 0; + + // The point of the deserializer is that some of the clustering prefix won't actually be used (because they are not + // within the bounds of the query), and we want to reduce allocation for them. So we only reuse the values array + // between elements if 1) we haven't returned the previous element (if we have, nextValues will be null) and 2) + // nextValues is of the proper size. Note that the 2nd condition may not hold for range tombstone bounds, but all + // rows have a fixed size clustering, so we'll still save in the common case. + if (nextValues == null || nextValues.length != nextSize) + this.nextValues = new ByteBuffer[nextSize]; } public int compareNextTo(Slice.Bound bound) throws IOException @@ -473,9 +479,9 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab return false; int i = deserializedSize++; - nextValues[i] = serializer.isNull(nextHeader, i) + nextValues[i] = Serializer.isNull(nextHeader, i) ? null - : (serializer.isEmpty(nextHeader, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : serializationHeader.clusteringTypes().get(i).readValue(in)); + : (Serializer.isEmpty(nextHeader, i) ? ByteBufferUtil.EMPTY_BYTE_BUFFER : serializationHeader.clusteringTypes().get(i).readValue(in)); return true; } @@ -485,29 +491,31 @@ public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, IMeasurab continue; } - public RangeTombstone.Bound.Kind deserializeNextBound(RangeTombstone.Bound.Writer writer) throws IOException + public RangeTombstone.Bound deserializeNextBound() throws IOException { assert !nextIsRow; deserializeAll(); - for (int i = 0; i < nextSize; i++) - writer.writeClusteringValue(nextValues[i]); - writer.writeBoundKind(nextKind); - return nextKind; + RangeTombstone.Bound bound = new RangeTombstone.Bound(nextKind, nextValues); + nextValues = null; + return bound; } - public void deserializeNextClustering(Clustering.Writer writer) throws IOException + public Clustering deserializeNextClustering() throws IOException { - assert nextIsRow && nextSize == nextValues.length; + assert nextIsRow; deserializeAll(); - for (int i = 0; i < nextSize; i++) - writer.writeClusteringValue(nextValues[i]); + Clustering clustering = new Clustering(nextValues); + nextValues = null; + return clustering; } public ClusteringPrefix.Kind skipNext() throws IOException { for (int i = deserializedSize; i < nextSize; i++) - if (!serializer.isNull(nextHeader, i) && !serializer.isEmpty(nextHeader, i)) + { + if (!Serializer.isNull(nextHeader, i) && !Serializer.isEmpty(nextHeader, i)) serializationHeader.clusteringTypes().get(i).skipValue(in); + } return nextKind; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 1a9b92d..52fc48f 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -74,7 +74,7 @@ public class ColumnIndex private int written; private ClusteringPrefix firstClustering; - private final ReusableClusteringPrefix lastClustering; + private ClusteringPrefix lastClustering; private DeletionTime openMarker; @@ -90,7 +90,6 @@ public class ColumnIndex this.result = new ColumnIndex(new ArrayList<IndexHelper.IndexInfo>()); this.initialPosition = writer.getFilePointer(); - this.lastClustering = new ReusableClusteringPrefix(iterator.metadata().clusteringColumns().size()); } private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException @@ -119,7 +118,7 @@ public class ColumnIndex private void addIndexBlock() { IndexHelper.IndexInfo cIndexInfo = new IndexHelper.IndexInfo(firstClustering, - lastClustering.get().takeAlias(), + lastClustering, startPosition, currentPosition() - startPosition, openMarker); @@ -129,28 +128,27 @@ public class ColumnIndex private void add(Unfiltered unfiltered) throws IOException { - lastClustering.copy(unfiltered.clustering()); - boolean isMarker = unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; - if (firstClustering == null) { // Beginning of an index block. Remember the start and position - firstClustering = lastClustering.get().takeAlias(); + firstClustering = unfiltered.clustering(); startPosition = currentPosition(); } UnfilteredSerializer.serializer.serialize(unfiltered, header, writer.stream, version); + lastClustering = unfiltered.clustering(); ++written; - if (isMarker) + if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered; + RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered; openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null; } // if we hit the column index size that we have to index after, go ahead and index it. if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize()) addIndexBlock(); + } private ColumnIndex close() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 055624b..48a4504 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; import java.util.*; +import java.util.function.Predicate; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -329,9 +330,9 @@ public class Columns implements Iterable<ColumnDefinition> } /** - * Whether this object is a subset of the provided other {@code Columns object}. + * Whether this object is a superset of the provided other {@code Columns object}. * - * @param other the othere object to test for inclusion in this object. + * @param other the other object to test for inclusion in this object. * * @return whether all the columns of {@code other} are contained by this object. */ @@ -439,6 +440,34 @@ public class Columns implements Iterable<ColumnDefinition> return new Columns(newColumns); } + /** + * Returns a predicate to test whether columns are included in this {@code Columns} object, + * assuming that tes tested columns are passed to the predicate in sorted order. + * + * @return a predicate to test the inclusion of sorted columns in this object. + */ + public Predicate<ColumnDefinition> inOrderInclusionTester() + { + return new Predicate<ColumnDefinition>() + { + private int i = 0; + + public boolean test(ColumnDefinition column) + { + while (i < columns.length) + { + int cmp = column.compareTo(columns[i]); + if (cmp < 0) + return false; + i++; + if (cmp == 0) + return true; + } + return false; + } + }; + } + public void digest(MessageDigest digest) { for (ColumnDefinition c : this) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/CounterMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java index d1830a0..6818513 100644 --- a/src/java/org/apache/cassandra/db/CounterMutation.java +++ b/src/java/org/apache/cassandra/db/CounterMutation.java @@ -155,7 +155,7 @@ public class CounterMutation implements IMutation /** * Returns a wrapper for the Striped#bulkGet() call (via Keyspace#counterLocksFor()) * Striped#bulkGet() depends on Object#hashCode(), so here we make sure that the cf id and the partition key - * all get to be part of the hashCode() calculation, not just the cell name. + * all get to be part of the hashCode() calculation. */ private Iterable<Object> getCounterLockKeys() { @@ -167,11 +167,11 @@ public class CounterMutation implements IMutation { public Iterable<Object> apply(final Row row) { - return Iterables.concat(Iterables.transform(row, new Function<Cell, Object>() + return Iterables.concat(Iterables.transform(row, new Function<ColumnData, Object>() { - public Object apply(final Cell cell) + public Object apply(final ColumnData data) { - return Objects.hashCode(update.metadata().cfId, key(), row.clustering(), cell.column(), cell.path()); + return Objects.hashCode(update.metadata().cfId, key(), row.clustering(), data.column()); } })); } @@ -238,7 +238,7 @@ public class CounterMutation implements IMutation BTreeSet.Builder<Clustering> names = BTreeSet.builder(cfs.metadata.comparator); for (PartitionUpdate.CounterMark mark : marks) { - names.add(mark.clustering().takeAlias()); + names.add(mark.clustering()); if (mark.path() == null) builder.add(mark.column()); else http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DataRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index 0d7a762..358b0ac 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -16,7 +16,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; @@ -26,6 +25,7 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.dht.*; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; @@ -372,7 +372,7 @@ public class DataRange } } - public DataRange deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public DataRange deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { AbstractBounds<PartitionPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index a441c48..0b5df06 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -19,255 +19,56 @@ package org.apache.cassandra.db; import java.util.Iterator; -import com.google.common.base.Objects; -import com.google.common.collect.Iterators; - import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.db.rows.RowStats; import org.apache.cassandra.utils.memory.AbstractAllocator; /** * A combination of a top-level (partition) tombstone and range tombstones describing the deletions * within a partition. + * <p> + * Note that in practice {@link MutableDeletionInfo} is the only concrete implementation of this, however + * different parts of the code will return either {@code DeletionInfo} or {@code MutableDeletionInfo} based + * on whether it can/should be mutated or not. + * <p> + * <b>Warning:</b> do not ever cast a {@code DeletionInfo} into a {@code MutableDeletionInfo} to mutate it!!! + * TODO: it would be safer to have 2 actual implementation of DeletionInfo, one mutable and one that isn't (I'm + * just lazy right this minute). */ -public class DeletionInfo implements IMeasurableMemory +public interface DeletionInfo extends IMeasurableMemory { - private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0)); - - /** - * This represents a deletion of the entire partition. We can't represent this within the RangeTombstoneList, so it's - * kept separately. This also slightly optimizes the common case of a full partition deletion. - */ - private DeletionTime partitionDeletion; - - /** - * A list of range tombstones within the partition. This is left as null if there are no range tombstones - * (to save an allocation (since it's a common case). - */ - private RangeTombstoneList ranges; - - /** - * Creates a DeletionInfo with only a top-level (row) tombstone. - * @param markedForDeleteAt the time after which the entire row should be considered deleted - * @param localDeletionTime what time the deletion write was applied locally (for purposes of - * purging the tombstone after gc_grace_seconds). - */ - public DeletionInfo(long markedForDeleteAt, int localDeletionTime) - { - // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE - // (see CASSANDRA-3872) - this(new SimpleDeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime)); - } - - public DeletionInfo(DeletionTime partitionDeletion) - { - this(partitionDeletion, null); - } - - public DeletionInfo(ClusteringComparator comparator, Slice slice, long markedForDeleteAt, int localDeletionTime) - { - this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1)); - ranges.add(slice.start(), slice.end(), markedForDeleteAt, localDeletionTime); - } - - public DeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList ranges) - { - this.partitionDeletion = partitionDeletion.takeAlias(); - this.ranges = ranges; - } - - /** - * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones. - */ - public static DeletionInfo live() - { - return new DeletionInfo(DeletionTime.LIVE); - } - - public DeletionInfo copy() - { - return new DeletionInfo(partitionDeletion, ranges == null ? null : ranges.copy()); - } - - public DeletionInfo copy(AbstractAllocator allocator) - { - RangeTombstoneList rangesCopy = null; - if (ranges != null) - rangesCopy = ranges.copy(allocator); - - return new DeletionInfo(partitionDeletion, rangesCopy); - } + // Note that while MutableDeletionInfo.live() is mutable, we expose it here as a non-mutable DeletionInfo so sharing is fine. + public static final DeletionInfo LIVE = MutableDeletionInfo.live(); /** * Returns whether this DeletionInfo is live, that is deletes no columns. */ - public boolean isLive() - { - return partitionDeletion.isLive() && (ranges == null || ranges.isEmpty()); - } + public boolean isLive(); - /** - * Return whether a given cell is deleted by this deletion info. - * - * @param clustering the clustering for the cell to check. - * @param cell the cell to check. - * @return true if the cell is deleted, false otherwise - */ - private boolean isDeleted(Clustering clustering, Cell cell) - { - // If we're live, don't consider anything deleted, even if the cell ends up having as timestamp Long.MIN_VALUE - // (which shouldn't happen in practice, but it would invalid to consider it deleted if it does). - if (isLive()) - return false; + public DeletionTime getPartitionDeletion(); - if (cell.livenessInfo().timestamp() <= partitionDeletion.markedForDeleteAt()) - return true; - - // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - if (!partitionDeletion.isLive() && cell.isCounterCell()) - return true; - - return ranges != null && ranges.isDeleted(clustering, cell); - } - - /** - * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt - * timestamp. - * @param newInfo - */ - public void add(DeletionTime newInfo) - { - if (newInfo.supersedes(partitionDeletion)) - partitionDeletion = newInfo; - } - - public void add(RangeTombstone tombstone, ClusteringComparator comparator) - { - if (ranges == null) - ranges = new RangeTombstoneList(comparator, 1); - - ranges.add(tombstone); - } - - /** - * Combines another DeletionInfo with this one and returns the result. Whichever top-level tombstone - * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime. The - * range tombstones will be combined. - * - * @return this object. - */ - public DeletionInfo add(DeletionInfo newInfo) - { - add(newInfo.partitionDeletion); - - if (ranges == null) - ranges = newInfo.ranges == null ? null : newInfo.ranges.copy(); - else if (newInfo.ranges != null) - ranges.addAll(newInfo.ranges); + // Use sparingly, not the most efficient thing + public Iterator<RangeTombstone> rangeIterator(boolean reversed); - return this; - } + public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed); - public DeletionTime getPartitionDeletion() - { - return partitionDeletion; - } + public RangeTombstone rangeCovering(Clustering name); - // Use sparingly, not the most efficient thing - public Iterator<RangeTombstone> rangeIterator(boolean reversed) - { - return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(reversed); - } + public void collectStats(RowStats.Collector collector); - public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed) - { - return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(slice, reversed); - } + public int dataSize(); - public RangeTombstone rangeCovering(Clustering name) - { - return ranges == null ? null : ranges.search(name); - } + public boolean hasRanges(); - public int dataSize() - { - int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt()); - return size + (ranges == null ? 0 : ranges.dataSize()); - } + public int rangeCount(); - public boolean hasRanges() - { - return ranges != null && !ranges.isEmpty(); - } - - public int rangeCount() - { - return hasRanges() ? ranges.size() : 0; - } + public long maxTimestamp(); /** * Whether this deletion info may modify the provided one if added to it. */ - public boolean mayModify(DeletionInfo delInfo) - { - return partitionDeletion.compareTo(delInfo.partitionDeletion) > 0 || hasRanges(); - } - - @Override - public String toString() - { - if (ranges == null || ranges.isEmpty()) - return String.format("{%s}", partitionDeletion); - else - return String.format("{%s, ranges=%s}", partitionDeletion, rangesAsString()); - } - - private String rangesAsString() - { - assert !ranges.isEmpty(); - StringBuilder sb = new StringBuilder(); - ClusteringComparator cc = ranges.comparator(); - Iterator<RangeTombstone> iter = rangeIterator(false); - while (iter.hasNext()) - { - RangeTombstone i = iter.next(); - sb.append(i.deletedSlice().toString(cc)); - sb.append("@"); - sb.append(i.deletionTime()); - } - return sb.toString(); - } - - // Updates all the timestamp of the deletion contained in this DeletionInfo to be {@code timestamp}. - public DeletionInfo updateAllTimestamp(long timestamp) - { - if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE) - partitionDeletion = new SimpleDeletionTime(timestamp, partitionDeletion.localDeletionTime()); - - if (ranges != null) - ranges.updateAllTimestamp(timestamp); - return this; - } - - @Override - public boolean equals(Object o) - { - if(!(o instanceof DeletionInfo)) - return false; - DeletionInfo that = (DeletionInfo)o; - return partitionDeletion.equals(that.partitionDeletion) && Objects.equal(ranges, that.ranges); - } - - @Override - public final int hashCode() - { - return Objects.hashCode(partitionDeletion, ranges); - } + public boolean mayModify(DeletionInfo delInfo); - @Override - public long unsharedHeapSize() - { - return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize()); - } + public MutableDeletionInfo mutableCopy(); + public DeletionInfo copy(AbstractAllocator allocator); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionPurger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionPurger.java b/src/java/org/apache/cassandra/db/DeletionPurger.java new file mode 100644 index 0000000..d368b69 --- /dev/null +++ b/src/java/org/apache/cassandra/db/DeletionPurger.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +public interface DeletionPurger +{ + public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true; + + public boolean shouldPurge(long timestamp, int localDeletionTime); + + public default boolean shouldPurge(DeletionTime dt) + { + return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), dt.localDeletionTime()); + } + + public default boolean shouldPurge(LivenessInfo liveness, int nowInSec) + { + return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), liveness.localExpirationTime()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 67842a7..3e9ca80 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.security.MessageDigest; import com.google.common.base.Objects; import org.apache.cassandra.cache.IMeasurableMemory; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -34,29 +34,44 @@ import org.apache.cassandra.utils.ObjectSizes; /** * Information on deletion of a storage engine object. */ -public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory, Aliasable<DeletionTime> +public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory { - private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleDeletionTime(0, 0)); + private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0)); /** * A special DeletionTime that signifies that there is no top-level (row) tombstone. */ - public static final DeletionTime LIVE = new SimpleDeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE); + public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE); public static final Serializer serializer = new Serializer(); + private final long markedForDeleteAt; + private final int localDeletionTime; + + public DeletionTime(long markedForDeleteAt, int localDeletionTime) + { + this.markedForDeleteAt = markedForDeleteAt; + this.localDeletionTime = localDeletionTime; + } + /** * A timestamp (typically in microseconds since the unix epoch, although this is not enforced) after which * data should be considered deleted. If set to Long.MIN_VALUE, this implies that the data has not been marked * for deletion at all. */ - public abstract long markedForDeleteAt(); + public long markedForDeleteAt() + { + return markedForDeleteAt; + } /** * The local server timestamp, in seconds since the unix epoch, at which this tombstone was created. This is * only used for purposes of purging the tombstone after gc_grace_seconds have elapsed. */ - public abstract int localDeletionTime(); + public int localDeletionTime() + { + return localDeletionTime; + } /** * Returns whether this DeletionTime is live, that is deletes no columns. @@ -112,14 +127,14 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura return markedForDeleteAt() > dt.markedForDeleteAt() || (markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > dt.localDeletionTime()); } - public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore) + public boolean deletes(LivenessInfo info) { - return markedForDeleteAt() < maxPurgeableTimestamp && localDeletionTime() < gcBefore; + return deletes(info.timestamp()); } - public boolean deletes(LivenessInfo info) + public boolean deletes(Cell cell) { - return deletes(info.timestamp()); + return deletes(cell.timestamp()); } public boolean deletes(long timestamp) @@ -151,10 +166,10 @@ public abstract class DeletionTime implements Comparable<DeletionTime>, IMeasura long mfda = in.readLong(); return mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE ? LIVE - : new SimpleDeletionTime(mfda, ldt); + : new DeletionTime(mfda, ldt); } - public void skip(DataInput in) throws IOException + public void skip(DataInputPlus in) throws IOException { FileUtils.skipBytesFully(in, 4 + 8); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionTimeArray.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTimeArray.java b/src/java/org/apache/cassandra/db/DeletionTimeArray.java deleted file mode 100644 index 77eb953..0000000 --- a/src/java/org/apache/cassandra/db/DeletionTimeArray.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.util.Arrays; - -import org.apache.cassandra.utils.ObjectSizes; - -/** - * Utility class to store an array of deletion times a bit efficiently. - */ -public class DeletionTimeArray -{ - private long[] markedForDeleteAts; - private int[] delTimes; - - public DeletionTimeArray(int initialCapacity) - { - this.markedForDeleteAts = new long[initialCapacity]; - this.delTimes = new int[initialCapacity]; - clear(); - } - - public void clear(int i) - { - markedForDeleteAts[i] = Long.MIN_VALUE; - delTimes[i] = Integer.MAX_VALUE; - } - - public void set(int i, DeletionTime dt) - { - this.markedForDeleteAts[i] = dt.markedForDeleteAt(); - this.delTimes[i] = dt.localDeletionTime(); - } - - public int size() - { - return markedForDeleteAts.length; - } - - public void resize(int newSize) - { - int prevSize = size(); - - markedForDeleteAts = Arrays.copyOf(markedForDeleteAts, newSize); - delTimes = Arrays.copyOf(delTimes, newSize); - - Arrays.fill(markedForDeleteAts, prevSize, newSize, Long.MIN_VALUE); - Arrays.fill(delTimes, prevSize, newSize, Integer.MAX_VALUE); - } - - public boolean supersedes(int i, DeletionTime dt) - { - return markedForDeleteAts[i] > dt.markedForDeleteAt(); - } - - public boolean supersedes(int i, int j) - { - return markedForDeleteAts[i] > markedForDeleteAts[j]; - } - - public void swap(int i, int j) - { - long m = markedForDeleteAts[j]; - int l = delTimes[j]; - - move(i, j); - - markedForDeleteAts[i] = m; - delTimes[i] = l; - } - - public void move(int i, int j) - { - markedForDeleteAts[j] = markedForDeleteAts[i]; - delTimes[j] = delTimes[i]; - } - - public boolean isLive(int i) - { - return markedForDeleteAts[i] > Long.MIN_VALUE; - } - - public void clear() - { - Arrays.fill(markedForDeleteAts, Long.MIN_VALUE); - Arrays.fill(delTimes, Integer.MAX_VALUE); - } - - public int dataSize() - { - return 12 * markedForDeleteAts.length; - } - - public long unsharedHeapSize() - { - return ObjectSizes.sizeOfArray(markedForDeleteAts) - + ObjectSizes.sizeOfArray(delTimes); - } - - public void copy(DeletionTimeArray other) - { - assert size() == other.size(); - for (int i = 0; i < size(); i++) - { - markedForDeleteAts[i] = other.markedForDeleteAts[i]; - delTimes[i] = other.delTimes[i]; - } - } - - public static class Cursor extends DeletionTime - { - private DeletionTimeArray array; - private int i; - - public Cursor setTo(DeletionTimeArray array, int i) - { - this.array = array; - this.i = i; - return this; - } - - public long markedForDeleteAt() - { - return array.markedForDeleteAts[i]; - } - - public int localDeletionTime() - { - return array.delTimes[i]; - } - - public DeletionTime takeAlias() - { - return new SimpleDeletionTime(markedForDeleteAt(), localDeletionTime()); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 38113c8..4501f3c 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -132,19 +132,13 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean UUID hintId = UUIDGen.getTimeUUID(); // serialize the hint with id and version as a composite column name - PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, - StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)), - PartitionColumns.of(hintColumn), - 1); - - Row.Writer writer = upd.writer(); - Rows.writeClustering(SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version), writer); + DecoratedKey key = StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)); + Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version); ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); - writer.writeCell(hintColumn, false, value, SimpleLivenessInfo.forUpdate(now, ttl, FBUtilities.nowInSeconds(), SystemKeyspace.Hints), null); - writer.endOfRow(); + Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value); - return new Mutation(upd); + return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, ArrayBackedRow.singleCellRow(clustering, cell))); } /* @@ -187,13 +181,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) { DecoratedKey dk = StorageService.getPartitioner().decorateKey(tokenBytes); - - PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, dk, PartitionColumns.of(hintColumn), 1); - - Row.Writer writer = upd.writer(); - Rows.writeClustering(clustering, writer); - Cells.writeTombstone(writer, hintColumn, timestamp, FBUtilities.nowInSeconds()); - + Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds()); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, ArrayBackedRow.singleCellRow(clustering, cell)); new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } @@ -420,7 +409,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean int version = Int32Type.instance.compose(hint.clustering().get(1)); Cell cell = hint.getCell(hintColumn); - final long timestamp = cell.livenessInfo().timestamp(); + final long timestamp = cell.timestamp(); DataInputPlus in = new NIODataInputStream(cell.value(), true); Mutation mutation; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index f063256..8242ab7 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -31,15 +31,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.thrift.ColumnDef; import org.apache.cassandra.utils.*; -import org.apache.hadoop.io.serializer.Serialization; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; @@ -103,7 +102,7 @@ public abstract class LegacyLayout if (metadata.isSuper()) { assert superColumnName != null; - return decodeForSuperColumn(metadata, new SimpleClustering(superColumnName), cellname); + return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname); } assert superColumnName == null; @@ -152,7 +151,7 @@ public abstract class LegacyLayout { // If it's a compact table, it means the column is in fact a "dynamic" one if (metadata.isCompactTable()) - return new LegacyCellName(new SimpleClustering(column), metadata.compactValueColumn(), null); + return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null); throw new UnknownColumnException(metadata, column); } @@ -242,7 +241,7 @@ public abstract class LegacyLayout ? CompositeType.splitName(value) : Collections.singletonList(value); - return new SimpleClustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); + return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); } public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering clustering) @@ -276,7 +275,7 @@ public abstract class LegacyLayout DeletionInfo delInfo, Iterator<LegacyCell> cells) { - SerializationHelper helper = new SerializationHelper(0, SerializationHelper.Flag.LOCAL); + SerializationHelper helper = new SerializationHelper(metadata, 0, SerializationHelper.Flag.LOCAL); return toUnfilteredRowIterator(metadata, key, LegacyDeletionInfo.from(delInfo), cells, false, helper); } @@ -320,22 +319,16 @@ public abstract class LegacyLayout Iterator<Row> rows = convertToRows(new CellGrouper(metadata, helper), iter, delInfo); Iterator<RangeTombstone> ranges = delInfo.deletionInfo.rangeIterator(reversed); - final Iterator<Unfiltered> atoms = new RowAndTombstoneMergeIterator(metadata.comparator, reversed) - .setTo(rows, ranges); - - return new AbstractUnfilteredRowIterator(metadata, - key, - delInfo.deletionInfo.getPartitionDeletion(), - metadata.partitionColumns(), - staticRow, - reversed, - RowStats.NO_STATS) - { - protected Unfiltered computeNext() - { - return atoms.hasNext() ? atoms.next() : endOfData(); - } - }; + return new RowAndDeletionMergeIterator(metadata, + key, + delInfo.deletionInfo.getPartitionDeletion(), + ColumnFilter.all(metadata), + staticRow, + reversed, + RowStats.NO_STATS, + rows, + ranges, + true); } public static Row extractStaticColumns(CFMetaData metadata, DataInputPlus in, Columns statics) throws IOException @@ -351,7 +344,7 @@ public abstract class LegacyLayout for (ColumnDefinition column : statics) columnsToFetch.add(column.name.bytes); - StaticRow.Builder builder = StaticRow.builder(statics, false, metadata.isCounter()); + Row.Builder builder = ArrayBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds()); boolean foundOne = false; LegacyAtom atom; @@ -364,7 +357,7 @@ public abstract class LegacyLayout continue; foundOne = true; - builder.writeCell(cell.name.column, cell.isCounter(), cell.value, livenessInfo(metadata, cell), null); + builder.addCell(new BufferCell(cell.name.column, cell.timestamp, cell.ttl, cell.localDeletionTime, cell.value, null)); } else { @@ -469,7 +462,7 @@ public abstract class LegacyLayout { return new AbstractIterator<LegacyCell>() { - private final Iterator<Cell> cells = row.iterator(); + private final Iterator<Cell> cells = row.cells().iterator(); // we don't have (and shouldn't have) row markers for compact tables. private boolean hasReturnedRowMarker = metadata.isCompactTable(); @@ -480,7 +473,7 @@ public abstract class LegacyLayout hasReturnedRowMarker = true; LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null); LivenessInfo info = row.primaryKeyLivenessInfo(); - return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localDeletionTime(), info.ttl()); + return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl()); } if (!cells.hasNext()) @@ -507,8 +500,7 @@ public abstract class LegacyLayout CellPath path = cell.path(); assert path == null || path.size() == 1; LegacyCellName name = new LegacyCellName(clustering, cell.column(), path == null ? null : path.get(0)); - LivenessInfo info = cell.livenessInfo(); - return new LegacyCell(kind, name, cell.value(), info.timestamp(), info.localDeletionTime(), info.ttl()); + return new LegacyCell(kind, name, cell.value(), cell.timestamp(), cell.localDeletionTime(), cell.ttl()); } public static RowIterator toRowIterator(final CFMetaData metadata, @@ -516,17 +508,10 @@ public abstract class LegacyLayout final Iterator<LegacyCell> cells, final int nowInSec) { - SerializationHelper helper = new SerializationHelper(0, SerializationHelper.Flag.LOCAL); + SerializationHelper helper = new SerializationHelper(metadata, 0, SerializationHelper.Flag.LOCAL); return UnfilteredRowIterators.filter(toUnfilteredRowIterator(metadata, key, LegacyDeletionInfo.live(), cells, false, helper), nowInSec); } - private static LivenessInfo livenessInfo(CFMetaData metadata, LegacyCell cell) - { - return cell.isTombstone() - ? SimpleLivenessInfo.forDeletion(cell.timestamp, cell.localDeletionTime) - : SimpleLivenessInfo.forUpdate(cell.timestamp, cell.ttl, cell.localDeletionTime, metadata); - } - public static Comparator<LegacyCell> legacyCellComparator(CFMetaData metadata) { return legacyCellComparator(metadata, false); @@ -662,7 +647,7 @@ public abstract class LegacyLayout ByteBuffer value = ByteBufferUtil.readWithLength(in); if (flag == SerializationHelper.Flag.FROM_REMOTE || (flag == SerializationHelper.Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))) value = CounterContext.instance().clearAllLocal(value); - return new LegacyCell(LegacyCell.Kind.COUNTER, decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL); + return new LegacyCell(LegacyCell.Kind.COUNTER, decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, Cell.NO_DELETION_TIME, Cell.NO_TTL); } else if ((mask & EXPIRATION_MASK) != 0) { @@ -678,10 +663,10 @@ public abstract class LegacyLayout ByteBuffer value = ByteBufferUtil.readWithLength(in); LegacyCellName name = decodeCellName(metadata, cellname, readAllAsDynamic); return (mask & COUNTER_UPDATE_MASK) != 0 - ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL) + ? new LegacyCell(LegacyCell.Kind.COUNTER, name, CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, Cell.NO_DELETION_TIME, Cell.NO_TTL) : ((mask & DELETION_MASK) == 0 - ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, ts, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL) - : new LegacyCell(LegacyCell.Kind.DELETED, name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), LivenessInfo.NO_TTL)); + ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, ts, Cell.NO_DELETION_TIME, Cell.NO_TTL) + : new LegacyCell(LegacyCell.Kind.DELETED, name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), Cell.NO_TTL)); } } @@ -741,10 +726,9 @@ public abstract class LegacyLayout public static class CellGrouper { public final CFMetaData metadata; - private final ReusableRow row; private final boolean isStatic; private final SerializationHelper helper; - private Row.Writer writer; + private Row.Builder builder; private Clustering clustering; private LegacyRangeTombstone rowDeletion; @@ -760,10 +744,7 @@ public abstract class LegacyLayout this.metadata = metadata; this.isStatic = isStatic; this.helper = helper; - this.row = isStatic ? null : new ReusableRow(metadata.clusteringColumns().size(), metadata.partitionColumns().regulars, false, metadata.isCounter()); - - if (isStatic) - this.writer = StaticRow.builder(metadata.partitionColumns().statics, false, metadata.isCounter()); + this.builder = ArrayBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars); } public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper) @@ -776,9 +757,6 @@ public abstract class LegacyLayout this.clustering = null; this.rowDeletion = null; this.collectionDeletion = null; - - if (!isStatic) - this.writer = row.writer(); } public boolean addAtom(LegacyAtom atom) @@ -797,8 +775,8 @@ public abstract class LegacyLayout } else if (clustering == null) { - clustering = cell.name.clustering.takeAlias(); - clustering.writeTo(writer); + clustering = cell.name.clustering; + builder.newRow(clustering); } else if (!clustering.equals(cell.name.clustering)) { @@ -809,14 +787,12 @@ public abstract class LegacyLayout if (rowDeletion != null && rowDeletion.deletionTime.deletes(cell.timestamp)) return true; - LivenessInfo info = livenessInfo(metadata, cell); - ColumnDefinition column = cell.name.column; if (column == null) { // It's the row marker assert !cell.value.hasRemaining(); - helper.writePartitionKeyLivenessInfo(writer, info.timestamp(), info.ttl(), info.localDeletionTime()); + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(cell.timestamp, cell.ttl, cell.localDeletionTime)); } else { @@ -833,11 +809,15 @@ public abstract class LegacyLayout // practice and 2) is only used during upgrade, it's probably worth keeping things simple. helper.startOfComplexColumn(column); path = cell.name.collectionElement == null ? null : CellPath.create(cell.name.collectionElement); + if (!helper.includes(path)) + return true; } - helper.writeCell(writer, column, cell.isCounter(), cell.value, info.timestamp(), info.localDeletionTime(), info.ttl(), path); + Cell c = new BufferCell(column, cell.timestamp, cell.ttl, cell.localDeletionTime, cell.value, path); + if (!helper.isDropped(c, column.isComplex())) + builder.addCell(c); if (column.isComplex()) { - helper.endOfComplexColumn(column); + helper.endOfComplexColumn(); } } } @@ -852,9 +832,9 @@ public abstract class LegacyLayout if (clustering != null) return false; - clustering = tombstone.start.getAsClustering(metadata).takeAlias(); - clustering.writeTo(writer); - writer.writeRowDeletion(tombstone.deletionTime); + clustering = tombstone.start.getAsClustering(metadata); + builder.newRow(clustering); + builder.addRowDeletion(tombstone.deletionTime); rowDeletion = tombstone; return true; } @@ -863,15 +843,15 @@ public abstract class LegacyLayout { if (clustering == null) { - clustering = tombstone.start.getAsClustering(metadata).takeAlias(); - clustering.writeTo(writer); + clustering = tombstone.start.getAsClustering(metadata); + builder.newRow(clustering); } else if (!clustering.equals(tombstone.start.getAsClustering(metadata))) { return false; } - writer.writeComplexDeletion(tombstone.start.collectionName, tombstone.deletionTime); + builder.addComplexDeletion(tombstone.start.collectionName, tombstone.deletionTime); if (rowDeletion == null || tombstone.deletionTime.supersedes(rowDeletion.deletionTime)) collectionDeletion = tombstone; return true; @@ -881,8 +861,7 @@ public abstract class LegacyLayout public Row getRow() { - writer.endOfRow(); - return isStatic ? ((StaticRow.Builder)writer).build() : row; + return builder.build(); } } @@ -947,7 +926,7 @@ public abstract class LegacyLayout ByteBuffer[] values = new ByteBuffer[bound.size()]; for (int i = 0; i < bound.size(); i++) values[i] = bound.get(i); - return new SimpleClustering(values); + return new Clustering(values); } @Override @@ -1005,13 +984,13 @@ public abstract class LegacyLayout public static LegacyCell regular(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, ByteBuffer value, long timestamp) throws UnknownColumnException { - return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, superColumnName, name), value, timestamp, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL); + return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, superColumnName, name), value, timestamp, Cell.NO_DELETION_TIME, Cell.NO_TTL); } public static LegacyCell expiring(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, ByteBuffer value, long timestamp, int ttl, int nowInSec) throws UnknownColumnException { - return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, superColumnName, name), value, timestamp, nowInSec, ttl); + return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, superColumnName, name), value, timestamp, nowInSec + ttl, ttl); } public static LegacyCell tombstone(CFMetaData metadata, ByteBuffer superColumnName, ByteBuffer name, long timestamp, int nowInSec) @@ -1030,7 +1009,7 @@ public abstract class LegacyLayout public static LegacyCell counter(LegacyCellName name, ByteBuffer value) { - return new LegacyCell(Kind.COUNTER, name, value, FBUtilities.timestampMicros(), LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL); + return new LegacyCell(Kind.COUNTER, name, value, FBUtilities.timestampMicros(), Cell.NO_DELETION_TIME, Cell.NO_TTL); } public ClusteringPrefix clustering() @@ -1205,7 +1184,7 @@ public abstract class LegacyLayout public static LegacyDeletionInfo live() { - return from(DeletionInfo.live()); + return from(DeletionInfo.LIVE); } public Iterator<LegacyRangeTombstone> inRowRangeTombstones() @@ -1228,7 +1207,7 @@ public abstract class LegacyLayout int rangeCount = in.readInt(); if (rangeCount == 0) - return from(new DeletionInfo(topLevel)); + return from(new MutableDeletionInfo(topLevel)); RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount); List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>(); @@ -1239,13 +1218,13 @@ public abstract class LegacyLayout int delTime = in.readInt(); long markedAt = in.readLong(); - LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new SimpleDeletionTime(markedAt, delTime)); + LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime)); if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata)) inRowTombsones.add(tombstone); else ranges.add(start.bound, end.bound, markedAt, delTime); } - return new LegacyDeletionInfo(new DeletionInfo(topLevel, ranges), inRowTombsones); + return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones); } public long serializedSize(CFMetaData metadata, LegacyDeletionInfo info, TypeSizes typeSizes, int version)
