http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 89971d1..8f7b1c2 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -17,128 +17,154 @@ */ package org.apache.cassandra.db; +import java.util.Objects; import java.security.MessageDigest; -import org.apache.cassandra.db.*; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.FBUtilities; /** - * Groups the informations necessary to decide the liveness of a given piece of - * column data. + * Stores the information relating to the liveness of the primary key columns of a row. * <p> - * In practice, a {@code LivenessInfo} groups 3 informations: - * 1) the data timestamp. It is sometimes allowed for a given piece of data to have - * no timestamp (for {@link Row#partitionKeyLivenessInfo} more precisely), but if that - * is the case it means the data has no liveness info at all. - * 2) the data ttl if relevant. - * 2) the data local deletion time if relevant (that is, if either the data has a ttl or is deleted). + * A {@code LivenessInfo} can first be empty. If it isn't, it contains at least a timestamp, + * which is the timestamp for the row primary key columns. On top of that, the info can be + * ttl'ed, in which case the {@code LivenessInfo} also has both a ttl and a local expiration time. */ -public interface LivenessInfo extends Aliasable<LivenessInfo> +public class LivenessInfo { public static final long NO_TIMESTAMP = Long.MIN_VALUE; public static final int NO_TTL = 0; - public static final int NO_DELETION_TIME = Integer.MAX_VALUE; + public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE; - public static final LivenessInfo NONE = new SimpleLivenessInfo(NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME); + public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP); - /** - * The timestamp at which the data was inserted or {@link NO_TIMESTAMP} - * if it has no timestamp (which may or may not be allowed). - * - * @return the liveness info timestamp. - */ - public long timestamp(); + protected final long timestamp; - /** - * Whether this liveness info has a timestamp or not. - * <p> - * Note that if this return {@code false}, then both {@link #hasTTL} and - * {@link #hasLocalDeletionTime} must return {@code false} too. - * - * @return whether this liveness info has a timestamp or not. - */ - public boolean hasTimestamp(); + protected LivenessInfo(long timestamp) + { + this.timestamp = timestamp; + } + + public static LivenessInfo create(CFMetaData metadata, long timestamp, int nowInSec) + { + int defaultTTL = metadata.getDefaultTimeToLive(); + if (defaultTTL != NO_TTL) + return expiring(timestamp, defaultTTL, nowInSec); + + return new LivenessInfo(timestamp); + } + + public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec) + { + return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl); + } + + public static LivenessInfo create(CFMetaData metadata, long timestamp, int ttl, int nowInSec) + { + return ttl == NO_TTL + ? create(metadata, timestamp, nowInSec) + : expiring(timestamp, ttl, nowInSec); + } + + // Note that this ctor ignores the default table ttl and takes the expiration time, not the current time. + // Use when you know that's what you want. + public static LivenessInfo create(long timestamp, int ttl, int localExpirationTime) + { + return ttl == NO_TTL ? new LivenessInfo(timestamp) : new ExpiringLivenessInfo(timestamp, ttl, localExpirationTime); + } /** - * The ttl (if any) on the data or {@link NO_TTL} if the data is not - * expiring. + * Whether this liveness info is empty (has no timestamp). * - * Please note that this value is the TTL that was set originally and is thus not - * changing. If you want to figure out how much time the data has before it expires, - * then you should use {@link #remainingTTL}. + * @return whether this liveness info is empty or not. */ - public int ttl(); + public boolean isEmpty() + { + return timestamp == NO_TIMESTAMP; + } /** - * Whether this liveness info has a TTL or not. + * The timestamp for this liveness info. * - * @return whether this liveness info has a TTL or not. + * @return the liveness info timestamp (or {@link #NO_TIMESTAMP} if the info is empty). */ - public boolean hasTTL(); + public long timestamp() + { + return timestamp; + } /** - * The deletion time (in seconds) on the data if applicable ({@link NO_DELETION} - * otherwise). - * - * There is 3 cases in practice: - * 1) the data is neither deleted nor expiring: it then has neither {@code ttl()} - * nor {@code localDeletionTime()}. - * 2) the data is expiring/expired: it then has both a {@code ttl()} and a - * {@code localDeletionTime()}. Whether it's still live or is expired depends - * on the {@code localDeletionTime()}. - * 3) the data is deleted: it has no {@code ttl()} but has a - * {@code localDeletionTime()}. + * Whether the info has a ttl. */ - public int localDeletionTime(); + public boolean isExpiring() + { + return false; + } /** - * Whether this liveness info has a local deletion time or not. + * The ttl (if any) on the row primary key columns or {@link #NO_TTL} if it is not + * expiring. * - * @return whether this liveness info has a local deletion time or not. + * Please note that this value is the TTL that was set originally and is thus not + * changing. */ - public boolean hasLocalDeletionTime(); + public int ttl() + { + return NO_TTL; + } /** - * The actual remaining time to live (in seconds) for the data this is - * the liveness information of. - * - * {@code #ttl} returns the initial TTL sets on the piece of data while this - * method computes how much time the data actually has to live given the - * current time. + * The expiration time (in seconds) if the info is expiring ({@link #NO_EXPIRATION_TIME} otherwise). * - * @param nowInSec the current time in seconds. - * @return the remaining time to live (in seconds) the data has, or - * {@code -1} if the data is either expired or not expiring. */ - public int remainingTTL(int nowInSec); + public int localExpirationTime() + { + return NO_EXPIRATION_TIME; + } /** - * Checks whether a given piece of data is live given the current time. + * Whether that info is still live. + * + * A {@code LivenessInfo} is live if it is either not expiring, or if its expiration time if after + * {@code nowInSec}. * * @param nowInSec the current time in seconds. - * @return whether the data having this liveness info is live or not. + * @return whether this liveness info is live or not. */ - public boolean isLive(int nowInSec); + public boolean isLive(int nowInSec) + { + return !isEmpty(); + } /** * Adds this liveness information to the provided digest. * * @param digest the digest to add this liveness information to. */ - public void digest(MessageDigest digest); + public void digest(MessageDigest digest) + { + FBUtilities.updateWithLong(digest, timestamp()); + } /** * Validate the data contained by this liveness information. * * @throws MarshalException if some of the data is corrupted. */ - public void validate(); + public void validate() + { + } /** * The size of the (useful) data this liveness information contains. * * @return the size of the data this liveness information contains. */ - public int dataSize(); + public int dataSize() + { + return TypeSizes.sizeof(timestamp()); + } /** * Whether this liveness information supersedes another one (that is @@ -148,31 +174,10 @@ public interface LivenessInfo extends Aliasable<LivenessInfo> * * @return whether this {@code LivenessInfo} supersedes {@code other}. */ - public boolean supersedes(LivenessInfo other); - - /** - * Returns the result of merging this info to another one (that is, it - * return this info if it supersedes the other one, or the other one - * otherwise). - */ - public LivenessInfo mergeWith(LivenessInfo other); - - /** - * Whether this liveness information can be purged. - * <p> - * A liveness info can be purged if it is not live and hasn't been so - * for longer than gcGrace (or more precisely, it's local deletion time - * is smaller than gcBefore, which is itself "now - gcGrace"). - * - * @param maxPurgeableTimestamp the biggest timestamp that can be purged. - * A liveness info will not be considered purgeable if its timestamp is - * greater than this value, even if it mets the other criteria for purging. - * @param gcBefore the local deletion time before which deleted/expired - * liveness info can be purged. - * - * @return whether this liveness information can be purged. - */ - public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore); + public boolean supersedes(LivenessInfo other) + { + return timestamp > other.timestamp; + } /** * Returns a copy of this liveness info updated with the provided timestamp. @@ -182,5 +187,108 @@ public interface LivenessInfo extends Aliasable<LivenessInfo> * as timestamp. If it has no timestamp however, this liveness info is returned * unchanged. */ - public LivenessInfo withUpdatedTimestamp(long newTimestamp); + public LivenessInfo withUpdatedTimestamp(long newTimestamp) + { + return new LivenessInfo(newTimestamp); + } + + @Override + public String toString() + { + return String.format("[ts=%d]", timestamp); + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof LivenessInfo)) + return false; + + LivenessInfo that = (LivenessInfo)other; + return this.timestamp() == that.timestamp() + && this.ttl() == that.ttl() + && this.localExpirationTime() == that.localExpirationTime(); + } + + @Override + public int hashCode() + { + return Objects.hash(timestamp(), ttl(), localExpirationTime()); + } + + private static class ExpiringLivenessInfo extends LivenessInfo + { + private final int ttl; + private final int localExpirationTime; + + private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime) + { + super(timestamp); + assert ttl != NO_TTL && localExpirationTime != NO_EXPIRATION_TIME; + this.ttl = ttl; + this.localExpirationTime = localExpirationTime; + } + + @Override + public int ttl() + { + return ttl; + } + + @Override + public int localExpirationTime() + { + return localExpirationTime; + } + + @Override + public boolean isExpiring() + { + return true; + } + + @Override + public boolean isLive(int nowInSec) + { + return nowInSec < localExpirationTime; + } + + @Override + public void digest(MessageDigest digest) + { + super.digest(digest); + FBUtilities.updateWithInt(digest, localExpirationTime); + FBUtilities.updateWithInt(digest, ttl); + } + + @Override + public void validate() + { + if (ttl < 0) + throw new MarshalException("A TTL should not be negative"); + if (localExpirationTime < 0) + throw new MarshalException("A local expiration time should not be negative"); + } + + @Override + public int dataSize() + { + return super.dataSize() + + TypeSizes.sizeof(ttl) + + TypeSizes.sizeof(localExpirationTime); + + } + + @Override + public LivenessInfo withUpdatedTimestamp(long newTimestamp) + { + return new ExpiringLivenessInfo(newTimestamp, ttl, localExpirationTime); + } + + @Override + public String toString() + { + return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime); + } + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LivenessInfoArray.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LivenessInfoArray.java b/src/java/org/apache/cassandra/db/LivenessInfoArray.java deleted file mode 100644 index 24026d8..0000000 --- a/src/java/org/apache/cassandra/db/LivenessInfoArray.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.util.Arrays; - -import org.apache.cassandra.utils.ObjectSizes; - -/** - * Utility class to store an array of liveness info efficiently. - */ -public class LivenessInfoArray -{ - private long[] timestamps; - private int[] delTimesAndTTLs; - - public LivenessInfoArray(int initialCapacity) - { - this.timestamps = new long[initialCapacity]; - this.delTimesAndTTLs = new int[initialCapacity * 2]; - clear(); - } - - public void clear(int i) - { - timestamps[i] = LivenessInfo.NO_TIMESTAMP; - delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME; - delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL; - } - - public void set(int i, LivenessInfo info) - { - set(i, info.timestamp(), info.ttl(), info.localDeletionTime()); - } - - public void set(int i, long timestamp, int ttl, int localDeletionTime) - { - this.timestamps[i] = timestamp; - this.delTimesAndTTLs[2 * i] = localDeletionTime; - this.delTimesAndTTLs[2 * i + 1] = ttl; - } - - public long timestamp(int i) - { - return timestamps[i]; - } - - public int localDeletionTime(int i) - { - return delTimesAndTTLs[2 * i]; - } - - public int ttl(int i) - { - return delTimesAndTTLs[2 * i + 1]; - } - - public boolean isLive(int i, int nowInSec) - { - // See AbstractLivenessInfo.isLive(). - return localDeletionTime(i) == LivenessInfo.NO_DELETION_TIME - || (ttl(i) != LivenessInfo.NO_TTL && nowInSec < localDeletionTime(i)); - } - - public int size() - { - return timestamps.length; - } - - public void resize(int newSize) - { - int prevSize = size(); - - timestamps = Arrays.copyOf(timestamps, newSize); - delTimesAndTTLs = Arrays.copyOf(delTimesAndTTLs, newSize * 2); - - clear(prevSize, newSize); - } - - public void swap(int i, int j) - { - long ts = timestamps[j]; - int ldt = delTimesAndTTLs[2 * j]; - int ttl = delTimesAndTTLs[2 * j + 1]; - - move(i, j); - - timestamps[i] = ts; - delTimesAndTTLs[2 * i] = ldt; - delTimesAndTTLs[2 * i + 1] = ttl; - } - - public void move(int i, int j) - { - timestamps[j] = timestamps[i]; - delTimesAndTTLs[2 * j] = delTimesAndTTLs[2 * i]; - delTimesAndTTLs[2 * j + 1] = delTimesAndTTLs[2 * i + 1]; - } - - public void clear() - { - clear(0, size()); - } - - private void clear(int from, int to) - { - Arrays.fill(timestamps, from, to, LivenessInfo.NO_TIMESTAMP); - for (int i = from; i < to; i++) - { - delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME; - delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL; - } - } - - public int dataSize() - { - return 16 * size(); - } - - public long unsharedHeapSize() - { - return ObjectSizes.sizeOfArray(timestamps) - + ObjectSizes.sizeOfArray(delTimesAndTTLs); - } - - public static Cursor newCursor() - { - return new Cursor(); - } - - public static class Cursor extends AbstractLivenessInfo - { - private LivenessInfoArray array; - private int i; - - public Cursor setTo(LivenessInfoArray array, int i) - { - this.array = array; - this.i = i; - return this; - } - - public long timestamp() - { - return array.timestamps[i]; - } - - public int localDeletionTime() - { - return array.delTimesAndTTLs[2 * i]; - } - - public int ttl() - { - return array.delTimesAndTTLs[2 * i + 1]; - } - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/MutableDeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java new file mode 100644 index 0000000..6b19283 --- /dev/null +++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.Iterator; + +import com.google.common.base.Objects; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.rows.RowStats; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * A mutable implementation of {@code DeletionInfo}. + */ +public class MutableDeletionInfo implements DeletionInfo +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new MutableDeletionInfo(0, 0)); + + /** + * This represents a deletion of the entire partition. We can't represent this within the RangeTombstoneList, so it's + * kept separately. This also slightly optimizes the common case of a full partition deletion. + */ + private DeletionTime partitionDeletion; + + /** + * A list of range tombstones within the partition. This is left as null if there are no range tombstones + * (to save an allocation (since it's a common case). + */ + private RangeTombstoneList ranges; + + /** + * Creates a DeletionInfo with only a top-level (row) tombstone. + * @param markedForDeleteAt the time after which the entire row should be considered deleted + * @param localDeletionTime what time the deletion write was applied locally (for purposes of + * purging the tombstone after gc_grace_seconds). + */ + public MutableDeletionInfo(long markedForDeleteAt, int localDeletionTime) + { + // Pre-1.1 node may return MIN_VALUE for non-deleted container, but the new default is MAX_VALUE + // (see CASSANDRA-3872) + this(new DeletionTime(markedForDeleteAt, localDeletionTime == Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime)); + } + + public MutableDeletionInfo(DeletionTime partitionDeletion) + { + this(partitionDeletion, null); + } + + public MutableDeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList ranges) + { + this.partitionDeletion = partitionDeletion; + this.ranges = ranges; + } + + /** + * Returns a new DeletionInfo that has no top-level tombstone or any range tombstones. + */ + public static MutableDeletionInfo live() + { + return new MutableDeletionInfo(DeletionTime.LIVE); + } + + public MutableDeletionInfo mutableCopy() + { + return new MutableDeletionInfo(partitionDeletion, ranges == null ? null : ranges.copy()); + } + + public MutableDeletionInfo copy(AbstractAllocator allocator) + { + RangeTombstoneList rangesCopy = null; + if (ranges != null) + rangesCopy = ranges.copy(allocator); + + return new MutableDeletionInfo(partitionDeletion, rangesCopy); + } + + /** + * Returns whether this DeletionInfo is live, that is deletes no columns. + */ + public boolean isLive() + { + return partitionDeletion.isLive() && (ranges == null || ranges.isEmpty()); + } + + /** + * Potentially replaces the top-level tombstone with another, keeping whichever has the higher markedForDeleteAt + * timestamp. + * @param newInfo the deletion time to add to this deletion info. + */ + public void add(DeletionTime newInfo) + { + if (newInfo.supersedes(partitionDeletion)) + partitionDeletion = newInfo; + } + + public void add(RangeTombstone tombstone, ClusteringComparator comparator) + { + if (ranges == null) + ranges = new RangeTombstoneList(comparator, 1); + + ranges.add(tombstone); + } + + /** + * Combines another DeletionInfo with this one and returns the result. Whichever top-level tombstone + * has the higher markedForDeleteAt timestamp will be kept, along with its localDeletionTime. The + * range tombstones will be combined. + * + * @return this object. + */ + public DeletionInfo add(DeletionInfo newInfo) + { + add(newInfo.getPartitionDeletion()); + + // We know MutableDeletionInfo is the only impelementation and we're not mutating it, it's just to get access to the + // RangeTombstoneList directly. + assert newInfo instanceof MutableDeletionInfo; + RangeTombstoneList newRanges = ((MutableDeletionInfo)newInfo).ranges; + + if (ranges == null) + ranges = newRanges == null ? null : newRanges.copy(); + else if (newRanges != null) + ranges.addAll(newRanges); + + return this; + } + + public DeletionTime getPartitionDeletion() + { + return partitionDeletion; + } + + // Use sparingly, not the most efficient thing + public Iterator<RangeTombstone> rangeIterator(boolean reversed) + { + return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(reversed); + } + + public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean reversed) + { + return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : ranges.iterator(slice, reversed); + } + + public RangeTombstone rangeCovering(Clustering name) + { + return ranges == null ? null : ranges.search(name); + } + + public int dataSize() + { + int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt()); + return size + (ranges == null ? 0 : ranges.dataSize()); + } + + public boolean hasRanges() + { + return ranges != null && !ranges.isEmpty(); + } + + public int rangeCount() + { + return hasRanges() ? ranges.size() : 0; + } + + public long maxTimestamp() + { + return ranges == null ? partitionDeletion.markedForDeleteAt() : Math.max(partitionDeletion.markedForDeleteAt(), ranges.maxMarkedAt()); + } + + /** + * Whether this deletion info may modify the provided one if added to it. + */ + public boolean mayModify(DeletionInfo delInfo) + { + return partitionDeletion.compareTo(delInfo.getPartitionDeletion()) > 0 || hasRanges(); + } + + @Override + public String toString() + { + if (ranges == null || ranges.isEmpty()) + return String.format("{%s}", partitionDeletion); + else + return String.format("{%s, ranges=%s}", partitionDeletion, rangesAsString()); + } + + private String rangesAsString() + { + assert !ranges.isEmpty(); + StringBuilder sb = new StringBuilder(); + ClusteringComparator cc = ranges.comparator(); + Iterator<RangeTombstone> iter = rangeIterator(false); + while (iter.hasNext()) + { + RangeTombstone i = iter.next(); + sb.append(i.deletedSlice().toString(cc)); + sb.append('@'); + sb.append(i.deletionTime()); + } + return sb.toString(); + } + + // Updates all the timestamp of the deletion contained in this DeletionInfo to be {@code timestamp}. + public DeletionInfo updateAllTimestamp(long timestamp) + { + if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE) + partitionDeletion = new DeletionTime(timestamp, partitionDeletion.localDeletionTime()); + + if (ranges != null) + ranges.updateAllTimestamp(timestamp); + return this; + } + + @Override + public boolean equals(Object o) + { + if(!(o instanceof MutableDeletionInfo)) + return false; + MutableDeletionInfo that = (MutableDeletionInfo)o; + return partitionDeletion.equals(that.partitionDeletion) && Objects.equal(ranges, that.ranges); + } + + @Override + public final int hashCode() + { + return Objects.hashCode(partitionDeletion, ranges); + } + + @Override + public long unsharedHeapSize() + { + return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize()); + } + + public void collectStats(RowStats.Collector collector) + { + collector.update(partitionDeletion); + if (ranges != null) + ranges.collectStats(collector); + } + + public static Builder builder(DeletionTime partitionLevelDeletion, ClusteringComparator comparator, boolean reversed) + { + return new Builder(partitionLevelDeletion, comparator, reversed); + } + + /** + * Builds DeletionInfo object from (in order) range tombstone markers. + */ + public static class Builder + { + private final MutableDeletionInfo deletion; + private final ClusteringComparator comparator; + + private final boolean reversed; + + private RangeTombstoneMarker openMarker; + + private Builder(DeletionTime partitionLevelDeletion, ClusteringComparator comparator, boolean reversed) + { + this.deletion = new MutableDeletionInfo(partitionLevelDeletion); + this.comparator = comparator; + this.reversed = reversed; + } + + public void add(RangeTombstoneMarker marker) + { + // We need to start by the close case in case that's a boundary + + if (marker.isClose(reversed)) + { + DeletionTime openDeletion = openMarker.openDeletionTime(reversed); + assert marker.closeDeletionTime(reversed).equals(openDeletion); + + Slice.Bound open = openMarker.openBound(reversed); + Slice.Bound close = marker.closeBound(reversed); + + Slice slice = reversed ? Slice.make(close, open) : Slice.make(open, close); + deletion.add(new RangeTombstone(slice, openDeletion), comparator); + } + + if (marker.isOpen(reversed)) + { + openMarker = marker; + } + } + + public MutableDeletionInfo build() + { + return deletion; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/PartitionColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java index ef05760..5f1da8a 100644 --- a/src/java/org/apache/cassandra/db/PartitionColumns.java +++ b/src/java/org/apache/cassandra/db/PartitionColumns.java @@ -66,6 +66,11 @@ public class PartitionColumns implements Iterable<ColumnDefinition> return statics.isEmpty() && regulars.isEmpty(); } + public Columns columns(boolean isStatic) + { + return isStatic ? statics : regulars; + } + public boolean contains(ColumnDefinition column) { return column.isStatic() ? statics.contains(column) : regulars.contains(column); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 65e38d0..4a3704f 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -32,6 +31,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.metrics.TableMetrics; @@ -278,11 +278,11 @@ public class PartitionRangeReadCommand extends ReadCommand private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range); } - }; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index df60933..8865b0f 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -17,20 +17,15 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.*; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.ISSTableSerializer; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.Interval; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** * A range tombstone is a tombstone that covers a slice/range of rows. @@ -48,7 +43,7 @@ public class RangeTombstone public RangeTombstone(Slice slice, DeletionTime deletion) { this.slice = slice; - this.deletion = deletion.takeAlias(); + this.deletion = deletion; } /** @@ -73,7 +68,7 @@ public class RangeTombstone public String toString(ClusteringComparator comparator) { - return slice.toString(comparator) + "@" + deletion; + return slice.toString(comparator) + '@' + deletion; } @Override @@ -108,9 +103,30 @@ public class RangeTombstone { public static final Serializer serializer = new Serializer(); + /** The smallest start bound, i.e. the one that starts before any row. */ + public static final Bound BOTTOM = new Bound(Kind.INCL_START_BOUND, EMPTY_VALUES_ARRAY); + /** The biggest end bound, i.e. the one that ends after any row. */ + public static final Bound TOP = new Bound(Kind.INCL_END_BOUND, EMPTY_VALUES_ARRAY); + public Bound(Kind kind, ByteBuffer[] values) { super(kind, values); + assert values.length > 0 || !kind.isBoundary(); + } + + public boolean isBoundary() + { + return kind.isBoundary(); + } + + public boolean isOpen(boolean reversed) + { + return kind.isOpen(reversed); + } + + public boolean isClose(boolean reversed) + { + return kind.isClose(reversed); } public static RangeTombstone.Bound inclusiveOpen(boolean reversed, ByteBuffer[] boundValues) @@ -143,6 +159,19 @@ public class RangeTombstone return new Bound(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : Kind.EXCL_END_INCL_START_BOUNDARY, boundValues); } + public static RangeTombstone.Bound fromSliceBound(Slice.Bound sliceBound) + { + return new RangeTombstone.Bound(sliceBound.kind(), sliceBound.getRawValues()); + } + + public RangeTombstone.Bound copy(AbstractAllocator allocator) + { + ByteBuffer[] newValues = new ByteBuffer[size()]; + for (int i = 0; i < size(); i++) + newValues[i] = allocator.clone(get(i)); + return new Bound(kind(), newValues); + } + @Override public Bound withNewKind(Kind kind) { @@ -165,13 +194,15 @@ public class RangeTombstone + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types); } - public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException + public RangeTombstone.Bound deserialize(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException { Kind kind = Kind.values()[in.readByte()]; - writer.writeBoundKind(kind); int size = in.readUnsignedShort(); - ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer); - return kind; + if (size == 0) + return kind.isStart() ? BOTTOM : TOP; + + ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types); + return new RangeTombstone.Bound(kind, values); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index 64f0978..96bcdb1 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -244,7 +244,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable { int idx = searchInternal(clustering, 0, size); // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.livenessInfo().timestamp()); + return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.timestamp()); } /** @@ -254,7 +254,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable public DeletionTime searchDeletionTime(Clustering name) { int idx = searchInternal(name, 0, size); - return idx < 0 ? null : new SimpleDeletionTime(markedAts[idx], delTimes[idx]); + return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]); } public RangeTombstone search(Clustering name) @@ -300,6 +300,23 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return dataSize; } + public long maxMarkedAt() + { + long max = Long.MIN_VALUE; + for (int i = 0; i < size; i++) + max = Math.max(max, markedAts[i]); + return max; + } + + public void collectStats(RowStats.Collector collector) + { + for (int i = 0; i < size; i++) + { + collector.updateTimestamp(markedAts[i]); + collector.updateLocalDeletionTime(delTimes[i]); + } + } + public void updateAllTimestamp(long timestamp) { for (int i = 0; i < size; i++) @@ -308,22 +325,22 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable private RangeTombstone rangeTombstone(int idx) { - return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); + return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewStart(int idx, Slice.Bound newStart) { - return new RangeTombstone(Slice.make(newStart, ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); + return new RangeTombstone(Slice.make(newStart, ends[idx]), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewEnd(int idx, Slice.Bound newEnd) { - return new RangeTombstone(Slice.make(starts[idx], newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); + return new RangeTombstone(Slice.make(starts[idx], newEnd), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewBounds(int idx, Slice.Bound newStart, Slice.Bound newEnd) { - return new RangeTombstone(Slice.make(newStart, newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); + return new RangeTombstone(Slice.make(newStart, newEnd), new DeletionTime(markedAts[idx], delTimes[idx])); } public Iterator<RangeTombstone> iterator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 99547f0..c3f036a 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -17,11 +17,12 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; +import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; @@ -63,7 +64,7 @@ public abstract class ReadCommand implements ReadQuery protected static abstract class SelectionDeserializer { - public abstract ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; + public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; } protected enum Kind @@ -268,22 +269,13 @@ public abstract class ReadCommand implements ReadQuery try { - resultIterator = UnfilteredPartitionIterators.convertExpiredCellsToTombstones(resultIterator, nowInSec); - resultIterator = withMetricsRecording(withoutExpiredTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); - - // TODO: we should push the dropping of columns down the layers because - // 1) it'll be more efficient - // 2) it could help us solve #6276 - // But there is not reason not to do this as a followup so keeping it here for now (we'll have - // to be wary of cached row if we move this down the layers) - if (!metadata().getDroppedColumns().isEmpty()) - resultIterator = UnfilteredPartitionIterators.removeDroppedColumns(resultIterator, metadata().getDroppedColumns()); + resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos); // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so // no point in checking it again. RowFilter updatedFilter = searcher == null - ? rowFilter() - : rowFilter().without(searcher.primaryClause(this)); + ? rowFilter() + : rowFilter().without(searcher.primaryClause(this)); // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However, // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it @@ -333,26 +325,33 @@ public abstract class ReadCommand implements ReadQuery { currentKey = iter.partitionKey(); - return new WrappingUnfilteredRowIterator(iter) + return new AlteringUnfilteredRowIterator(iter) { - public Unfiltered next() + @Override + protected Row computeNextStatic(Row row) { - Unfiltered unfiltered = super.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - { - Row row = (Row) unfiltered; - if (row.hasLiveData(ReadCommand.this.nowInSec())) - ++liveRows; - for (Cell cell : row) - if (!cell.isLive(ReadCommand.this.nowInSec())) - countTombstone(row.clustering()); - } - else + return computeNext(row); + } + + @Override + protected Row computeNext(Row row) + { + if (row.hasLiveData(ReadCommand.this.nowInSec())) + ++liveRows; + + for (Cell cell : row.cells()) { - countTombstone(unfiltered.clustering()); + if (!cell.isLive(ReadCommand.this.nowInSec())) + countTombstone(row.clustering()); } + return row; + } - return unfiltered; + @Override + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + countTombstone(marker.clustering()); + return marker; } private void countTombstone(ClusteringPrefix clustering) @@ -407,12 +406,12 @@ public abstract class ReadCommand implements ReadQuery protected abstract void appendCQLWhereClause(StringBuilder sb); - // Skip expired tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it - // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for expired tombstones (which + // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it + // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive). - protected UnfilteredPartitionIterator withoutExpiredTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) + protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs) { - return new TombstonePurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec())) + return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec())) { protected long getMaxPurgeableTimestamp() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableClustering.java b/src/java/org/apache/cassandra/db/ReusableClustering.java deleted file mode 100644 index e2760aa..0000000 --- a/src/java/org/apache/cassandra/db/ReusableClustering.java +++ /dev/null @@ -1,82 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.cassandra.utils.ObjectSizes; - -public class ReusableClustering extends Clustering -{ - private static final long EMPTY_SIZE = ObjectSizes.measure(new ReusableClustering(0)); - - protected final ByteBuffer[] values; - - protected ReusableWriter writer; - - public ReusableClustering(int size) - { - this.values = new ByteBuffer[size]; - } - - public int size() - { - return values.length; - } - - public ByteBuffer get(int i) - { - return values[i]; - } - - public ByteBuffer[] getRawValues() - { - return values; - } - - public Writer writer() - { - if (writer == null) - writer = new ReusableWriter(); - return writer; - } - - public void reset() - { - Arrays.fill(values, null); - if (writer != null) - writer.reset(); - } - - protected class ReusableWriter implements Writer - { - int idx; - - public void writeClusteringValue(ByteBuffer value) - { - values[idx++] = value; - } - - private void reset() - { - idx = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java deleted file mode 100644 index d2f19f7..0000000 --- a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.cassandra.utils.ObjectSizes; - -// Note that we abuse a bit ReusableClustering to store Slice.Bound infos, but it's convenient so ... -public class ReusableClusteringPrefix extends ReusableClustering -{ - private Kind kind; - private int size; - - public ReusableClusteringPrefix(int size) - { - super(size); - } - - public ClusteringPrefix get() - { - // We use ReusableClusteringPrefix when writing sstables (in ColumnIndex) and we - // don't write static clustering there. - assert kind != Kind.STATIC_CLUSTERING; - if (kind == Kind.CLUSTERING) - { - assert values.length == size; - return this; - } - - return Slice.Bound.create(kind, Arrays.copyOfRange(values, 0, size)); - } - - public void copy(ClusteringPrefix clustering) - { - kind = clustering.kind(); - for (int i = 0; i < clustering.size(); i++) - values[i] = clustering.get(i); - size = clustering.size(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java deleted file mode 100644 index 43530b0..0000000 --- a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -public class ReusableLivenessInfo extends AbstractLivenessInfo -{ - private long timestamp; - private int ttl; - private int localDeletionTime; - - public ReusableLivenessInfo() - { - reset(); - } - - public LivenessInfo setTo(LivenessInfo info) - { - return setTo(info.timestamp(), info.ttl(), info.localDeletionTime()); - } - - public LivenessInfo setTo(long timestamp, int ttl, int localDeletionTime) - { - this.timestamp = timestamp; - this.ttl = ttl; - this.localDeletionTime = localDeletionTime; - return this; - } - - public long timestamp() - { - return timestamp; - } - - public int ttl() - { - return ttl; - } - - public int localDeletionTime() - { - return localDeletionTime; - } - - public void reset() - { - this.timestamp = LivenessInfo.NO_TIMESTAMP; - this.ttl = LivenessInfo.NO_TTL; - this.localDeletionTime = LivenessInfo.NO_DELETION_TIME; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index 627321e..7640512 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -43,25 +43,27 @@ public class RowUpdateBuilder { private final PartitionUpdate update; - private final LivenessInfo defaultLiveness; - private final LivenessInfo deletionLiveness; + private final long timestamp; + private final int ttl; + private final int localDeletionTime; + private final DeletionTime deletionTime; private final Mutation mutation; - private Row.Writer regularWriter; - private Row.Writer staticWriter; + private Row.Builder regularBuilder; + private Row.Builder staticBuilder; - private boolean hasSetClustering; private boolean useRowMarker = true; private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, int localDeletionTime, Mutation mutation) { this.update = update; - this.defaultLiveness = SimpleLivenessInfo.forUpdate(timestamp, ttl, localDeletionTime, update.metadata()); - this.deletionLiveness = SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime); - this.deletionTime = new SimpleDeletionTime(timestamp, localDeletionTime); + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + this.deletionTime = new DeletionTime(timestamp, localDeletionTime); // note that the created mutation may get further update later on, so we don't use the ctor that create a singletonMap // underneath (this class if for convenience, not performance) @@ -73,31 +75,45 @@ public class RowUpdateBuilder this(update, timestamp, ttl, FBUtilities.nowInSeconds(), mutation); } - private Row.Writer writer() + private void startRow(Clustering clustering) { - assert staticWriter == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; - if (regularWriter == null) - { - regularWriter = update.writer(); + assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; + assert regularBuilder == null : "Cannot add the clustering twice to the same row"; + + regularBuilder = ArrayBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds()); + regularBuilder.newRow(clustering); + + // If a CQL table, add the "row marker" + if (update.metadata().isCQLTable() && useRowMarker) + regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(update.metadata(), timestamp, ttl, localDeletionTime)); + } - // If a CQL table, add the "row marker" - if (update.metadata().isCQLTable() && useRowMarker) - regularWriter.writePartitionKeyLivenessInfo(defaultLiveness); + private Row.Builder builder() + { + assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; + if (regularBuilder == null) + { + // we don't force people to call clustering() if the table has no clustering, so call it ourselves + assert update.metadata().comparator.size() == 0 : "Missing call to clustering()"; + startRow(Clustering.EMPTY); } - return regularWriter; + return regularBuilder; } - private Row.Writer staticWriter() + private Row.Builder staticBuilder() { - assert regularWriter == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; - if (staticWriter == null) - staticWriter = update.staticWriter(); - return staticWriter; + assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; + if (staticBuilder == null) + { + staticBuilder = ArrayBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds()); + staticBuilder.newRow(Clustering.STATIC_CLUSTERING); + } + return staticBuilder; } - private Row.Writer writer(ColumnDefinition c) + private Row.Builder builder(ColumnDefinition c) { - return c.isStatic() ? staticWriter() : writer(); + return c.isStatic() ? staticBuilder() : builder(); } public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object partitionKey) @@ -145,18 +161,17 @@ public class RowUpdateBuilder public RowUpdateBuilder clustering(Object... clusteringValues) { assert clusteringValues.length == update.metadata().comparator.size() - : "Invalid clustering values length. Expected: " + update.metadata().comparator.size() + " got: " + clusteringValues.length; - hasSetClustering = true; - if (clusteringValues.length > 0) - Rows.writeClustering(update.metadata().comparator.make(clusteringValues), writer()); + : "Invalid clustering values length. Expected: " + update.metadata().comparator.size() + " got: " + clusteringValues.length; + + startRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues)); return this; } public Mutation build() { - Row.Writer writer = regularWriter == null ? staticWriter : regularWriter; - if (writer != null) - writer.endOfRow(); + Row.Builder builder = regularBuilder == null ? staticBuilder : regularBuilder; + if (builder != null) + update.add(builder.build()); return mutation; } @@ -170,14 +185,16 @@ public class RowUpdateBuilder { assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty()); - Row.Writer writer = clusteringValues.length == update.metadata().comparator.size() - ? update.writer() - : update.staticWriter(); + boolean isStatic = clusteringValues.length != update.metadata().comparator.size(); + Row.Builder builder = ArrayBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars); - if (clusteringValues.length > 0) - Rows.writeClustering(update.metadata().comparator.make(clusteringValues), writer); - writer.writeRowDeletion(new SimpleDeletionTime(timestamp, FBUtilities.nowInSeconds())); - writer.endOfRow(); + if (isStatic) + builder.newRow(Clustering.STATIC_CLUSTERING); + else + builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : update.metadata().comparator.make(clusteringValues)); + builder.addRowDeletion(new DeletionTime(timestamp, FBUtilities.nowInSeconds())); + + update.add(builder.build()); } public static Mutation deleteRow(CFMetaData metadata, long timestamp, Mutation mutation, Object... clusteringValues) @@ -219,22 +236,21 @@ public class RowUpdateBuilder { ColumnDefinition c = getDefinition(columnName); assert c != null : "Cannot find column " + columnName; - assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided"; + assert c.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; assert c.type.isCollection() && c.type.isMultiCell(); - writer(c).writeComplexDeletion(c, new SimpleDeletionTime(defaultLiveness.timestamp() - 1, deletionTime.localDeletionTime())); + builder(c).addComplexDeletion(c, new DeletionTime(timestamp - 1, localDeletionTime)); return this; } public RowUpdateBuilder addRangeTombstone(RangeTombstone rt) { - update.addRangeTombstone(rt); + update.add(rt); return this; } public RowUpdateBuilder addRangeTombstone(Slice slice) { - update.addRangeTombstone(slice, deletionTime); - return this; + return addRangeTombstone(new RangeTombstone(slice, deletionTime)); } public RowUpdateBuilder addRangeTombstone(Object start, Object end) @@ -251,13 +267,17 @@ public class RowUpdateBuilder return add(c, value); } + private Cell makeCell(ColumnDefinition c, ByteBuffer value, CellPath path) + { + return value == null + ? BufferCell.tombstone(c, timestamp, localDeletionTime) + : (ttl == LivenessInfo.NO_TTL ? BufferCell.live(update.metadata(), c, timestamp, value, path) : BufferCell.expiring(c, timestamp, ttl, localDeletionTime, value, path)); + } + public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object value) { - assert columnDefinition.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + columnDefinition + " since no clustering hasn't been provided"; - if (value == null) - writer(columnDefinition).writeCell(columnDefinition, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, deletionLiveness, null); - else - writer(columnDefinition).writeCell(columnDefinition, false, bb(value, columnDefinition.type), defaultLiveness, null); + assert columnDefinition.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + columnDefinition + " since no clustering hasn't been provided"; + builder(columnDefinition).addCell(makeCell(columnDefinition, bb(value, columnDefinition.type), null)); return this; } @@ -273,8 +293,11 @@ public class RowUpdateBuilder return add(columnDefinition, null); } - private ByteBuffer bb(Object value, AbstractType<?> type) + private static ByteBuffer bb(Object value, AbstractType<?> type) { + if (value == null) + return null; + if (value instanceof ByteBuffer) return (ByteBuffer)value; @@ -306,30 +329,30 @@ public class RowUpdateBuilder public RowUpdateBuilder addMapEntry(String columnName, Object key, Object value) { ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided"; + assert c.isStatic() || update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; assert c.type instanceof MapType && c.type.isMultiCell(); MapType mt = (MapType)c.type; - writer(c).writeCell(c, false, bb(value, mt.getValuesType()), defaultLiveness, CellPath.create(bb(key, mt.getKeysType()))); + builder(c).addCell(makeCell(c, bb(value, mt.getValuesType()), CellPath.create(bb(key, mt.getKeysType())))); return this; } public RowUpdateBuilder addListEntry(String columnName, Object value) { ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided"; + assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; assert c.type instanceof ListType && c.type.isMultiCell(); ListType lt = (ListType)c.type; - writer(c).writeCell(c, false, bb(value, lt.getElementsType()), defaultLiveness, CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))); + builder(c).addCell(makeCell(c, bb(value, lt.getElementsType()), CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())))); return this; } public RowUpdateBuilder addSetEntry(String columnName, Object value) { ColumnDefinition c = getDefinition(columnName); - assert c.isStatic() || hasSetClustering : "Cannot set non static column " + c + " since no clustering has been provided"; + assert c.isStatic() || regularBuilder != null : "Cannot set non static column " + c + " since no clustering has been provided"; assert c.type instanceof SetType && c.type.isMultiCell(); SetType st = (SetType)c.type; - writer(c).writeCell(c, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, defaultLiveness, CellPath.create(bb(value, st.getElementsType()))); + builder(c).addCell(makeCell(c, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.create(bb(value, st.getElementsType())))); return this; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index c720804..5784260 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; @@ -97,7 +96,7 @@ public class SerializationHeader // We always use a dense layout for the static row. Having very many static columns with only a few set at // any given time doesn't feel very common at all (and we already optimize the case where no static at all // are provided). - return isStatic ? false : useSparseColumnLayout; + return !isStatic && useSparseColumnLayout; } public static SerializationHeader forKeyCache(CFMetaData metadata) @@ -159,13 +158,7 @@ public class SerializationHeader private static List<AbstractType<?>> typesOf(List<ColumnDefinition> columns) { - return ImmutableList.copyOf(Lists.transform(columns, new Function<ColumnDefinition, AbstractType<?>>() - { - public AbstractType<?> apply(ColumnDefinition column) - { - return column.type; - } - })); + return ImmutableList.copyOf(Lists.transform(columns, column -> column.type)); } public PartitionColumns columns() @@ -365,7 +358,7 @@ public class SerializationHeader Columns.serializer.serialize(header.columns.regulars, out); } - public SerializationHeader deserializeForMessaging(DataInput in, CFMetaData metadata, boolean hasStatic) throws IOException + public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, boolean hasStatic) throws IOException { RowStats stats = RowStats.serializer.deserialize(in); @@ -458,7 +451,7 @@ public class SerializationHeader return size; } - private void readColumnsWithType(DataInput in, Map<ByteBuffer, AbstractType<?>> typeMap) throws IOException + private void readColumnsWithType(DataInputPlus in, Map<ByteBuffer, AbstractType<?>> typeMap) throws IOException { int length = in.readUnsignedShort(); for (int i = 0; i < length; i++) @@ -474,7 +467,7 @@ public class SerializationHeader ByteBufferUtil.writeWithLength(UTF8Type.instance.decompose(type.toString()), out); } - private AbstractType<?> readType(DataInput in) throws IOException + private AbstractType<?> readType(DataInputPlus in) throws IOException { ByteBuffer raw = ByteBufferUtil.readWithLength(in); return TypeParser.parse(UTF8Type.instance.compose(raw)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleClustering.java b/src/java/org/apache/cassandra/db/SimpleClustering.java deleted file mode 100644 index 8b1cb7b..0000000 --- a/src/java/org/apache/cassandra/db/SimpleClustering.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.nio.ByteBuffer; - -import org.apache.cassandra.utils.ObjectSizes; - -public class SimpleClustering extends Clustering -{ - private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleClustering(new ByteBuffer[0])); - - private final ByteBuffer[] values; - - public SimpleClustering(ByteBuffer... values) - { - this.values = values; - } - - public SimpleClustering(ByteBuffer value) - { - this(new ByteBuffer[]{ value }); - } - - public int size() - { - return values.length; - } - - public ByteBuffer get(int i) - { - return values[i]; - } - - public ByteBuffer[] getRawValues() - { - return values; - } - - @Override - public long unsharedHeapSize() - { - return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); - } - - @Override - public Clustering takeAlias() - { - return this; - } - - public static Builder builder(int size) - { - return new Builder(size); - } - - public static class Builder implements Writer - { - private final ByteBuffer[] values; - private int idx; - - private Builder(int size) - { - this.values = new ByteBuffer[size]; - } - - public void writeClusteringValue(ByteBuffer value) - { - values[idx++] = value; - } - - public SimpleClustering build() - { - assert idx == values.length; - return new SimpleClustering(values); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleDeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java deleted file mode 100644 index 738c5e6..0000000 --- a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; - -import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ObjectSizes; - -/** - * Simple implementation of DeletionTime. - */ -public class SimpleDeletionTime extends DeletionTime -{ - public final long markedForDeleteAt; - public final int localDeletionTime; - - @VisibleForTesting - public SimpleDeletionTime(long markedForDeleteAt, int localDeletionTime) - { - this.markedForDeleteAt = markedForDeleteAt; - this.localDeletionTime = localDeletionTime; - } - - public long markedForDeleteAt() - { - return markedForDeleteAt; - } - - public int localDeletionTime() - { - return localDeletionTime; - } - - public DeletionTime takeAlias() - { - return this; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java deleted file mode 100644 index fea1b86..0000000 --- a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.util.Objects; -import java.security.MessageDigest; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.FBUtilities; - -public class SimpleLivenessInfo extends AbstractLivenessInfo -{ - private final long timestamp; - private final int ttl; - private final int localDeletionTime; - - // Note that while some code use this ctor, the two following static creation methods - // are usually less error prone. - SimpleLivenessInfo(long timestamp, int ttl, int localDeletionTime) - { - this.timestamp = timestamp; - this.ttl = ttl; - this.localDeletionTime = localDeletionTime; - } - - public static SimpleLivenessInfo forUpdate(long timestamp, int ttl, int nowInSec, CFMetaData metadata) - { - if (ttl == NO_TTL) - ttl = metadata.getDefaultTimeToLive(); - - return new SimpleLivenessInfo(timestamp, ttl, ttl == NO_TTL ? NO_DELETION_TIME : nowInSec + ttl); - } - - public static SimpleLivenessInfo forDeletion(long timestamp, int localDeletionTime) - { - return new SimpleLivenessInfo(timestamp, NO_TTL, localDeletionTime); - } - - public long timestamp() - { - return timestamp; - } - - public int ttl() - { - return ttl; - } - - public int localDeletionTime() - { - return localDeletionTime; - } - - @Override - public LivenessInfo takeAlias() - { - return this; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index cb43cd3..53ead14 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -146,7 +146,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) { - final Mutation mutation = new Mutation(UnfilteredRowIterators.toUpdate(iter)); + final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter)); StageManager.getStage(Stage.MUTATION).execute(new Runnable() { public void run() @@ -232,7 +232,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not) // and 2) the requested columns. - if (!row.primaryKeyLivenessInfo().hasTimestamp() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp) + if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp) return false; for (ColumnDefinition column : requestedColumns) @@ -242,7 +242,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus return false; Cell cell = row.getCell(column); - if (cell == null || cell.livenessInfo().timestamp() <= sstableTimestamp) + if (cell == null || cell.timestamp() <= sstableTimestamp) return false; } return true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index f9f583f..80711d6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.util.*; @@ -29,6 +28,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.service.*; @@ -483,7 +483,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { DecoratedKey key = StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in));
