http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java 
b/src/java/org/apache/cassandra/db/LivenessInfo.java
index 89971d1..8f7b1c2 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -17,128 +17,154 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.Objects;
 import java.security.MessageDigest;
 
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
- * Groups the informations necessary to decide the liveness of a given piece of
- * column data.
+ * Stores the information relating to the liveness of the primary key columns 
of a row.
  * <p>
- * In practice, a {@code LivenessInfo} groups 3 informations:
- *   1) the data timestamp. It is sometimes allowed for a given piece of data 
to have
- *      no timestamp (for {@link Row#partitionKeyLivenessInfo} more 
precisely), but if that
- *      is the case it means the data has no liveness info at all.
- *   2) the data ttl if relevant.
- *   2) the data local deletion time if relevant (that is, if either the data 
has a ttl or is deleted).
+ * A {@code LivenessInfo} can first be empty. If it isn't, it contains at 
least a timestamp,
+ * which is the timestamp for the row primary key columns. On top of that, the 
info can be
+ * ttl'ed, in which case the {@code LivenessInfo} also has both a ttl and a 
local expiration time.
  */
-public interface LivenessInfo extends Aliasable<LivenessInfo>
+public class LivenessInfo
 {
     public static final long NO_TIMESTAMP = Long.MIN_VALUE;
     public static final int NO_TTL = 0;
-    public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
+    public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE;
 
-    public static final LivenessInfo NONE = new 
SimpleLivenessInfo(NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME);
+    public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
 
-    /**
-     * The timestamp at which the data was inserted or {@link NO_TIMESTAMP}
-     * if it has no timestamp (which may or may not be allowed).
-     *
-     * @return the liveness info timestamp.
-     */
-    public long timestamp();
+    protected final long timestamp;
 
-    /**
-     * Whether this liveness info has a timestamp or not.
-     * <p>
-     * Note that if this return {@code false}, then both {@link #hasTTL} and
-     * {@link #hasLocalDeletionTime} must return {@code false} too.
-     *
-     * @return whether this liveness info has a timestamp or not.
-     */
-    public boolean hasTimestamp();
+    protected LivenessInfo(long timestamp)
+    {
+        this.timestamp = timestamp;
+    }
+
+    public static LivenessInfo create(CFMetaData metadata, long timestamp, int 
nowInSec)
+    {
+        int defaultTTL = metadata.getDefaultTimeToLive();
+        if (defaultTTL != NO_TTL)
+            return expiring(timestamp, defaultTTL, nowInSec);
+
+        return new LivenessInfo(timestamp);
+    }
+
+    public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec)
+    {
+        return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl);
+    }
+
+    public static LivenessInfo create(CFMetaData metadata, long timestamp, int 
ttl, int nowInSec)
+    {
+        return ttl == NO_TTL
+             ? create(metadata, timestamp, nowInSec)
+             : expiring(timestamp, ttl, nowInSec);
+    }
+
+    // Note that this ctor ignores the default table ttl and takes the 
expiration time, not the current time.
+    // Use when you know that's what you want.
+    public static LivenessInfo create(long timestamp, int ttl, int 
localExpirationTime)
+    {
+        return ttl == NO_TTL ? new LivenessInfo(timestamp) : new 
ExpiringLivenessInfo(timestamp, ttl, localExpirationTime);
+    }
 
     /**
-     * The ttl (if any) on the data or {@link NO_TTL} if the data is not
-     * expiring.
+     * Whether this liveness info is empty (has no timestamp).
      *
-     * Please note that this value is the TTL that was set originally and is 
thus not
-     * changing. If you want to figure out how much time the data has before 
it expires,
-     * then you should use {@link #remainingTTL}.
+     * @return whether this liveness info is empty or not.
      */
-    public int ttl();
+    public boolean isEmpty()
+    {
+        return timestamp == NO_TIMESTAMP;
+    }
 
     /**
-     * Whether this liveness info has a TTL or not.
+     * The timestamp for this liveness info.
      *
-     * @return whether this liveness info has a TTL or not.
+     * @return the liveness info timestamp (or {@link #NO_TIMESTAMP} if the 
info is empty).
      */
-    public boolean hasTTL();
+    public long timestamp()
+    {
+        return timestamp;
+    }
 
     /**
-     * The deletion time (in seconds) on the data if applicable ({@link 
NO_DELETION}
-     * otherwise).
-     *
-     * There is 3 cases in practice:
-     *   1) the data is neither deleted nor expiring: it then has neither 
{@code ttl()}
-     *      nor {@code localDeletionTime()}.
-     *   2) the data is expiring/expired: it then has both a {@code ttl()} and 
a
-     *      {@code localDeletionTime()}. Whether it's still live or is expired 
depends
-     *      on the {@code localDeletionTime()}.
-     *   3) the data is deleted: it has no {@code ttl()} but has a
-     *      {@code localDeletionTime()}.
+     * Whether the info has a ttl.
      */
-    public int localDeletionTime();
+    public boolean isExpiring()
+    {
+        return false;
+    }
 
     /**
-     * Whether this liveness info has a local deletion time or not.
+     * The ttl (if any) on the row primary key columns or {@link #NO_TTL} if 
it is not
+     * expiring.
      *
-     * @return whether this liveness info has a local deletion time or not.
+     * Please note that this value is the TTL that was set originally and is 
thus not
+     * changing.
      */
-    public boolean hasLocalDeletionTime();
+    public int ttl()
+    {
+        return NO_TTL;
+    }
 
     /**
-     * The actual remaining time to live (in seconds) for the data this is
-     * the liveness information of.
-     *
-     * {@code #ttl} returns the initial TTL sets on the piece of data while 
this
-     * method computes how much time the data actually has to live given the
-     * current time.
+     * The expiration time (in seconds) if the info is expiring ({@link 
#NO_EXPIRATION_TIME} otherwise).
      *
-     * @param nowInSec the current time in seconds.
-     * @return the remaining time to live (in seconds) the data has, or
-     * {@code -1} if the data is either expired or not expiring.
      */
-    public int remainingTTL(int nowInSec);
+    public int localExpirationTime()
+    {
+        return NO_EXPIRATION_TIME;
+    }
 
     /**
-     * Checks whether a given piece of data is live given the current time.
+     * Whether that info is still live.
+     *
+     * A {@code LivenessInfo} is live if it is either not expiring, or if its 
expiration time if after
+     * {@code nowInSec}.
      *
      * @param nowInSec the current time in seconds.
-     * @return whether the data having this liveness info is live or not.
+     * @return whether this liveness info is live or not.
      */
-    public boolean isLive(int nowInSec);
+    public boolean isLive(int nowInSec)
+    {
+        return !isEmpty();
+    }
 
     /**
      * Adds this liveness information to the provided digest.
      *
      * @param digest the digest to add this liveness information to.
      */
-    public void digest(MessageDigest digest);
+    public void digest(MessageDigest digest)
+    {
+        FBUtilities.updateWithLong(digest, timestamp());
+    }
 
     /**
      * Validate the data contained by this liveness information.
      *
      * @throws MarshalException if some of the data is corrupted.
      */
-    public void validate();
+    public void validate()
+    {
+    }
 
     /**
      * The size of the (useful) data this liveness information contains.
      *
      * @return the size of the data this liveness information contains.
      */
-    public int dataSize();
+    public int dataSize()
+    {
+        return TypeSizes.sizeof(timestamp());
+    }
 
     /**
      * Whether this liveness information supersedes another one (that is
@@ -148,31 +174,10 @@ public interface LivenessInfo extends 
Aliasable<LivenessInfo>
      *
      * @return whether this {@code LivenessInfo} supersedes {@code other}.
      */
-    public boolean supersedes(LivenessInfo other);
-
-    /**
-     * Returns the result of merging this info to another one (that is, it
-     * return this info if it supersedes the other one, or the other one
-     * otherwise).
-     */
-    public LivenessInfo mergeWith(LivenessInfo other);
-
-    /**
-     * Whether this liveness information can be purged.
-     * <p>
-     * A liveness info can be purged if it is not live and hasn't been so
-     * for longer than gcGrace (or more precisely, it's local deletion time
-     * is smaller than gcBefore, which is itself "now - gcGrace").
-     *
-     * @param maxPurgeableTimestamp the biggest timestamp that can be purged.
-     * A liveness info will not be considered purgeable if its timestamp is
-     * greater than this value, even if it mets the other criteria for purging.
-     * @param gcBefore the local deletion time before which deleted/expired
-     * liveness info can be purged.
-     *
-     * @return whether this liveness information can be purged.
-     */
-    public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore);
+    public boolean supersedes(LivenessInfo other)
+    {
+        return timestamp > other.timestamp;
+    }
 
     /**
      * Returns a copy of this liveness info updated with the provided 
timestamp.
@@ -182,5 +187,108 @@ public interface LivenessInfo extends 
Aliasable<LivenessInfo>
      * as timestamp. If it has no timestamp however, this liveness info is 
returned
      * unchanged.
      */
-    public LivenessInfo withUpdatedTimestamp(long newTimestamp);
+    public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+    {
+        return new LivenessInfo(newTimestamp);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("[ts=%d]", timestamp);
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof LivenessInfo))
+            return false;
+
+        LivenessInfo that = (LivenessInfo)other;
+        return this.timestamp() == that.timestamp()
+            && this.ttl() == that.ttl()
+            && this.localExpirationTime() == that.localExpirationTime();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(timestamp(), ttl(), localExpirationTime());
+    }
+
+    private static class ExpiringLivenessInfo extends LivenessInfo
+    {
+        private final int ttl;
+        private final int localExpirationTime;
+
+        private ExpiringLivenessInfo(long timestamp, int ttl, int 
localExpirationTime)
+        {
+            super(timestamp);
+            assert ttl != NO_TTL && localExpirationTime != NO_EXPIRATION_TIME;
+            this.ttl = ttl;
+            this.localExpirationTime = localExpirationTime;
+        }
+
+        @Override
+        public int ttl()
+        {
+            return ttl;
+        }
+
+        @Override
+        public int localExpirationTime()
+        {
+            return localExpirationTime;
+        }
+
+        @Override
+        public boolean isExpiring()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isLive(int nowInSec)
+        {
+            return nowInSec < localExpirationTime;
+        }
+
+        @Override
+        public void digest(MessageDigest digest)
+        {
+            super.digest(digest);
+            FBUtilities.updateWithInt(digest, localExpirationTime);
+            FBUtilities.updateWithInt(digest, ttl);
+        }
+
+        @Override
+        public void validate()
+        {
+            if (ttl < 0)
+                throw new MarshalException("A TTL should not be negative");
+            if (localExpirationTime < 0)
+                throw new MarshalException("A local expiration time should not 
be negative");
+        }
+
+        @Override
+        public int dataSize()
+        {
+            return super.dataSize()
+                 + TypeSizes.sizeof(ttl)
+                 + TypeSizes.sizeof(localExpirationTime);
+
+        }
+
+        @Override
+        public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+        {
+            return new ExpiringLivenessInfo(newTimestamp, ttl, 
localExpirationTime);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, 
localExpirationTime);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LivenessInfoArray.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfoArray.java 
b/src/java/org/apache/cassandra/db/LivenessInfoArray.java
deleted file mode 100644
index 24026d8..0000000
--- a/src/java/org/apache/cassandra/db/LivenessInfoArray.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Utility class to store an array of liveness info efficiently.
- */
-public class LivenessInfoArray
-{
-    private long[] timestamps;
-    private int[] delTimesAndTTLs;
-
-    public LivenessInfoArray(int initialCapacity)
-    {
-        this.timestamps = new long[initialCapacity];
-        this.delTimesAndTTLs = new int[initialCapacity * 2];
-        clear();
-    }
-
-    public void clear(int i)
-    {
-        timestamps[i] = LivenessInfo.NO_TIMESTAMP;
-        delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME;
-        delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL;
-    }
-
-    public void set(int i, LivenessInfo info)
-    {
-        set(i, info.timestamp(), info.ttl(), info.localDeletionTime());
-    }
-
-    public void set(int i, long timestamp, int ttl, int localDeletionTime)
-    {
-        this.timestamps[i] = timestamp;
-        this.delTimesAndTTLs[2 * i] = localDeletionTime;
-        this.delTimesAndTTLs[2 * i + 1] = ttl;
-    }
-
-    public long timestamp(int i)
-    {
-        return timestamps[i];
-    }
-
-    public int localDeletionTime(int i)
-    {
-        return delTimesAndTTLs[2 * i];
-    }
-
-    public int ttl(int i)
-    {
-        return delTimesAndTTLs[2 * i + 1];
-    }
-
-    public boolean isLive(int i, int nowInSec)
-    {
-        // See AbstractLivenessInfo.isLive().
-        return localDeletionTime(i) == LivenessInfo.NO_DELETION_TIME
-            || (ttl(i) != LivenessInfo.NO_TTL && nowInSec < 
localDeletionTime(i));
-    }
-
-    public int size()
-    {
-        return timestamps.length;
-    }
-
-    public void resize(int newSize)
-    {
-        int prevSize = size();
-
-        timestamps = Arrays.copyOf(timestamps, newSize);
-        delTimesAndTTLs = Arrays.copyOf(delTimesAndTTLs, newSize * 2);
-
-        clear(prevSize, newSize);
-    }
-
-    public void swap(int i, int j)
-    {
-        long ts = timestamps[j];
-        int ldt = delTimesAndTTLs[2 * j];
-        int ttl = delTimesAndTTLs[2 * j + 1];
-
-        move(i, j);
-
-        timestamps[i] = ts;
-        delTimesAndTTLs[2 * i] = ldt;
-        delTimesAndTTLs[2 * i + 1] = ttl;
-    }
-
-    public void move(int i, int j)
-    {
-        timestamps[j] = timestamps[i];
-        delTimesAndTTLs[2 * j] = delTimesAndTTLs[2 * i];
-        delTimesAndTTLs[2 * j + 1] = delTimesAndTTLs[2 * i + 1];
-    }
-
-    public void clear()
-    {
-        clear(0, size());
-    }
-
-    private void clear(int from, int to)
-    {
-        Arrays.fill(timestamps, from, to, LivenessInfo.NO_TIMESTAMP);
-        for (int i = from; i < to; i++)
-        {
-            delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME;
-            delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL;
-        }
-    }
-
-    public int dataSize()
-    {
-        return 16 * size();
-    }
-
-    public long unsharedHeapSize()
-    {
-        return ObjectSizes.sizeOfArray(timestamps)
-             + ObjectSizes.sizeOfArray(delTimesAndTTLs);
-    }
-
-    public static Cursor newCursor()
-    {
-        return new Cursor();
-    }
-
-    public static class Cursor extends AbstractLivenessInfo
-    {
-        private LivenessInfoArray array;
-        private int i;
-
-        public Cursor setTo(LivenessInfoArray array, int i)
-        {
-            this.array = array;
-            this.i = i;
-            return this;
-        }
-
-        public long timestamp()
-        {
-            return array.timestamps[i];
-        }
-
-        public int localDeletionTime()
-        {
-            return array.delTimesAndTTLs[2 * i];
-        }
-
-        public int ttl()
-        {
-            return array.delTimesAndTTLs[2 * i + 1];
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java 
b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
new file mode 100644
index 0000000..6b19283
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Iterator;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * A mutable implementation of {@code DeletionInfo}.
+ */
+public class MutableDeletionInfo implements DeletionInfo
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
MutableDeletionInfo(0, 0));
+
+    /**
+     * This represents a deletion of the entire partition. We can't represent 
this within the RangeTombstoneList, so it's
+     * kept separately. This also slightly optimizes the common case of a full 
partition deletion.
+     */
+    private DeletionTime partitionDeletion;
+
+    /**
+     * A list of range tombstones within the partition.  This is left as null 
if there are no range tombstones
+     * (to save an allocation (since it's a common case).
+     */
+    private RangeTombstoneList ranges;
+
+    /**
+     * Creates a DeletionInfo with only a top-level (row) tombstone.
+     * @param markedForDeleteAt the time after which the entire row should be 
considered deleted
+     * @param localDeletionTime what time the deletion write was applied 
locally (for purposes of
+     *                          purging the tombstone after gc_grace_seconds).
+     */
+    public MutableDeletionInfo(long markedForDeleteAt, int localDeletionTime)
+    {
+        // Pre-1.1 node may return MIN_VALUE for non-deleted container, but 
the new default is MAX_VALUE
+        // (see CASSANDRA-3872)
+        this(new DeletionTime(markedForDeleteAt, localDeletionTime == 
Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
+    }
+
+    public MutableDeletionInfo(DeletionTime partitionDeletion)
+    {
+        this(partitionDeletion, null);
+    }
+
+    public MutableDeletionInfo(DeletionTime partitionDeletion, 
RangeTombstoneList ranges)
+    {
+        this.partitionDeletion = partitionDeletion;
+        this.ranges = ranges;
+    }
+
+    /**
+     * Returns a new DeletionInfo that has no top-level tombstone or any range 
tombstones.
+     */
+    public static MutableDeletionInfo live()
+    {
+        return new MutableDeletionInfo(DeletionTime.LIVE);
+    }
+
+    public MutableDeletionInfo mutableCopy()
+    {
+        return new MutableDeletionInfo(partitionDeletion, ranges == null ? 
null : ranges.copy());
+    }
+
+    public MutableDeletionInfo copy(AbstractAllocator allocator)
+    {
+        RangeTombstoneList rangesCopy = null;
+        if (ranges != null)
+             rangesCopy = ranges.copy(allocator);
+
+        return new MutableDeletionInfo(partitionDeletion, rangesCopy);
+    }
+
+    /**
+     * Returns whether this DeletionInfo is live, that is deletes no columns.
+     */
+    public boolean isLive()
+    {
+        return partitionDeletion.isLive() && (ranges == null || 
ranges.isEmpty());
+    }
+
+    /**
+     * Potentially replaces the top-level tombstone with another, keeping 
whichever has the higher markedForDeleteAt
+     * timestamp.
+     * @param newInfo the deletion time to add to this deletion info.
+     */
+    public void add(DeletionTime newInfo)
+    {
+        if (newInfo.supersedes(partitionDeletion))
+            partitionDeletion = newInfo;
+    }
+
+    public void add(RangeTombstone tombstone, ClusteringComparator comparator)
+    {
+        if (ranges == null)
+            ranges = new RangeTombstoneList(comparator, 1);
+
+        ranges.add(tombstone);
+    }
+
+    /**
+     * Combines another DeletionInfo with this one and returns the result.  
Whichever top-level tombstone
+     * has the higher markedForDeleteAt timestamp will be kept, along with its 
localDeletionTime.  The
+     * range tombstones will be combined.
+     *
+     * @return this object.
+     */
+    public DeletionInfo add(DeletionInfo newInfo)
+    {
+        add(newInfo.getPartitionDeletion());
+
+        // We know MutableDeletionInfo is the only impelementation and we're 
not mutating it, it's just to get access to the
+        // RangeTombstoneList directly.
+        assert newInfo instanceof MutableDeletionInfo;
+        RangeTombstoneList newRanges = ((MutableDeletionInfo)newInfo).ranges;
+
+        if (ranges == null)
+            ranges = newRanges == null ? null : newRanges.copy();
+        else if (newRanges != null)
+            ranges.addAll(newRanges);
+
+        return this;
+    }
+
+    public DeletionTime getPartitionDeletion()
+    {
+        return partitionDeletion;
+    }
+
+    // Use sparingly, not the most efficient thing
+    public Iterator<RangeTombstone> rangeIterator(boolean reversed)
+    {
+        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : 
ranges.iterator(reversed);
+    }
+
+    public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean 
reversed)
+    {
+        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : 
ranges.iterator(slice, reversed);
+    }
+
+    public RangeTombstone rangeCovering(Clustering name)
+    {
+        return ranges == null ? null : ranges.search(name);
+    }
+
+    public int dataSize()
+    {
+        int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt());
+        return size + (ranges == null ? 0 : ranges.dataSize());
+    }
+
+    public boolean hasRanges()
+    {
+        return ranges != null && !ranges.isEmpty();
+    }
+
+    public int rangeCount()
+    {
+        return hasRanges() ? ranges.size() : 0;
+    }
+
+    public long maxTimestamp()
+    {
+        return ranges == null ? partitionDeletion.markedForDeleteAt() : 
Math.max(partitionDeletion.markedForDeleteAt(), ranges.maxMarkedAt());
+    }
+
+    /**
+     * Whether this deletion info may modify the provided one if added to it.
+     */
+    public boolean mayModify(DeletionInfo delInfo)
+    {
+        return partitionDeletion.compareTo(delInfo.getPartitionDeletion()) > 0 
|| hasRanges();
+    }
+
+    @Override
+    public String toString()
+    {
+        if (ranges == null || ranges.isEmpty())
+            return String.format("{%s}", partitionDeletion);
+        else
+            return String.format("{%s, ranges=%s}", partitionDeletion, 
rangesAsString());
+    }
+
+    private String rangesAsString()
+    {
+        assert !ranges.isEmpty();
+        StringBuilder sb = new StringBuilder();
+        ClusteringComparator cc = ranges.comparator();
+        Iterator<RangeTombstone> iter = rangeIterator(false);
+        while (iter.hasNext())
+        {
+            RangeTombstone i = iter.next();
+            sb.append(i.deletedSlice().toString(cc));
+            sb.append('@');
+            sb.append(i.deletionTime());
+        }
+        return sb.toString();
+    }
+
+    // Updates all the timestamp of the deletion contained in this 
DeletionInfo to be {@code timestamp}.
+    public DeletionInfo updateAllTimestamp(long timestamp)
+    {
+        if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
+            partitionDeletion = new DeletionTime(timestamp, 
partitionDeletion.localDeletionTime());
+
+        if (ranges != null)
+            ranges.updateAllTimestamp(timestamp);
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if(!(o instanceof MutableDeletionInfo))
+            return false;
+        MutableDeletionInfo that = (MutableDeletionInfo)o;
+        return partitionDeletion.equals(that.partitionDeletion) && 
Objects.equal(ranges, that.ranges);
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        return Objects.hashCode(partitionDeletion, ranges);
+    }
+
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == 
null ? 0 : ranges.unsharedHeapSize());
+    }
+
+    public void collectStats(RowStats.Collector collector)
+    {
+        collector.update(partitionDeletion);
+        if (ranges != null)
+            ranges.collectStats(collector);
+    }
+
+    public static Builder builder(DeletionTime partitionLevelDeletion, 
ClusteringComparator comparator, boolean reversed)
+    {
+        return new Builder(partitionLevelDeletion, comparator, reversed);
+    }
+
+    /**
+     * Builds DeletionInfo object from (in order) range tombstone markers.
+     */
+    public static class Builder
+    {
+        private final MutableDeletionInfo deletion;
+        private final ClusteringComparator comparator;
+
+        private final boolean reversed;
+
+        private RangeTombstoneMarker openMarker;
+
+        private Builder(DeletionTime partitionLevelDeletion, 
ClusteringComparator comparator, boolean reversed)
+        {
+            this.deletion = new MutableDeletionInfo(partitionLevelDeletion);
+            this.comparator = comparator;
+            this.reversed = reversed;
+        }
+
+        public void add(RangeTombstoneMarker marker)
+        {
+            // We need to start by the close case in case that's a boundary
+
+            if (marker.isClose(reversed))
+            {
+                DeletionTime openDeletion = 
openMarker.openDeletionTime(reversed);
+                assert marker.closeDeletionTime(reversed).equals(openDeletion);
+
+                Slice.Bound open = openMarker.openBound(reversed);
+                Slice.Bound close = marker.closeBound(reversed);
+
+                Slice slice = reversed ? Slice.make(close, open) : 
Slice.make(open, close);
+                deletion.add(new RangeTombstone(slice, openDeletion), 
comparator);
+            }
+
+            if (marker.isOpen(reversed))
+            {
+                openMarker = marker;
+            }
+        }
+
+        public MutableDeletionInfo build()
+        {
+            return deletion;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java 
b/src/java/org/apache/cassandra/db/PartitionColumns.java
index ef05760..5f1da8a 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -66,6 +66,11 @@ public class PartitionColumns implements 
Iterable<ColumnDefinition>
         return statics.isEmpty() && regulars.isEmpty();
     }
 
+    public Columns columns(boolean isStatic)
+    {
+        return isStatic ? statics : regulars;
+    }
+
     public boolean contains(ColumnDefinition column)
     {
         return column.isStatic() ? statics.contains(column) : 
regulars.contains(column);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 65e38d0..4a3704f 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -278,11 +278,11 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInput in, int version, boolean 
isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, 
metadata);
             return new PartitionRangeReadCommand(isDigest, isForThrift, 
metadata, nowInSec, columnFilter, rowFilter, limits, range);
         }
-    };
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java 
b/src/java/org/apache/cassandra/db/RangeTombstone.java
index df60933..8865b0f 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -17,20 +17,15 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ISSTableSerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A range tombstone is a tombstone that covers a slice/range of rows.
@@ -48,7 +43,7 @@ public class RangeTombstone
     public RangeTombstone(Slice slice, DeletionTime deletion)
     {
         this.slice = slice;
-        this.deletion = deletion.takeAlias();
+        this.deletion = deletion;
     }
 
     /**
@@ -73,7 +68,7 @@ public class RangeTombstone
 
     public String toString(ClusteringComparator comparator)
     {
-        return slice.toString(comparator) + "@" + deletion;
+        return slice.toString(comparator) + '@' + deletion;
     }
 
     @Override
@@ -108,9 +103,30 @@ public class RangeTombstone
     {
         public static final Serializer serializer = new Serializer();
 
+        /** The smallest start bound, i.e. the one that starts before any row. 
*/
+        public static final Bound BOTTOM = new Bound(Kind.INCL_START_BOUND, 
EMPTY_VALUES_ARRAY);
+        /** The biggest end bound, i.e. the one that ends after any row. */
+        public static final Bound TOP = new Bound(Kind.INCL_END_BOUND, 
EMPTY_VALUES_ARRAY);
+
         public Bound(Kind kind, ByteBuffer[] values)
         {
             super(kind, values);
+            assert values.length > 0 || !kind.isBoundary();
+        }
+
+        public boolean isBoundary()
+        {
+            return kind.isBoundary();
+        }
+
+        public boolean isOpen(boolean reversed)
+        {
+            return kind.isOpen(reversed);
+        }
+
+        public boolean isClose(boolean reversed)
+        {
+            return kind.isClose(reversed);
         }
 
         public static RangeTombstone.Bound inclusiveOpen(boolean reversed, 
ByteBuffer[] boundValues)
@@ -143,6 +159,19 @@ public class RangeTombstone
             return new Bound(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : 
Kind.EXCL_END_INCL_START_BOUNDARY, boundValues);
         }
 
+        public static RangeTombstone.Bound fromSliceBound(Slice.Bound 
sliceBound)
+        {
+            return new RangeTombstone.Bound(sliceBound.kind(), 
sliceBound.getRawValues());
+        }
+
+        public RangeTombstone.Bound copy(AbstractAllocator allocator)
+        {
+            ByteBuffer[] newValues = new ByteBuffer[size()];
+            for (int i = 0; i < size(); i++)
+                newValues[i] = allocator.clone(get(i));
+            return new Bound(kind(), newValues);
+        }
+
         @Override
         public Bound withNewKind(Kind kind)
         {
@@ -165,13 +194,15 @@ public class RangeTombstone
                      + 
ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, 
types);
             }
 
-            public Kind deserialize(DataInput in, int version, 
List<AbstractType<?>> types, Writer writer) throws IOException
+            public RangeTombstone.Bound deserialize(DataInputPlus in, int 
version, List<AbstractType<?>> types) throws IOException
             {
                 Kind kind = Kind.values()[in.readByte()];
-                writer.writeBoundKind(kind);
                 int size = in.readUnsignedShort();
-                ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, 
size, version, types, writer);
-                return kind;
+                if (size == 0)
+                    return kind.isStart() ? BOTTOM : TOP;
+
+                ByteBuffer[] values = 
ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, 
types);
+                return new RangeTombstone.Bound(kind, values);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RangeTombstoneList.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java 
b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
index 64f0978..96bcdb1 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java
@@ -244,7 +244,7 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
     {
         int idx = searchInternal(clustering, 0, size);
         // No matter what the counter cell's timestamp is, a tombstone always 
takes precedence. See CASSANDRA-7346.
-        return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= 
cell.livenessInfo().timestamp());
+        return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= 
cell.timestamp());
     }
 
     /**
@@ -254,7 +254,7 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
     public DeletionTime searchDeletionTime(Clustering name)
     {
         int idx = searchInternal(name, 0, size);
-        return idx < 0 ? null : new SimpleDeletionTime(markedAts[idx], 
delTimes[idx]);
+        return idx < 0 ? null : new DeletionTime(markedAts[idx], 
delTimes[idx]);
     }
 
     public RangeTombstone search(Clustering name)
@@ -300,6 +300,23 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
         return dataSize;
     }
 
+    public long maxMarkedAt()
+    {
+        long max = Long.MIN_VALUE;
+        for (int i = 0; i < size; i++)
+            max = Math.max(max, markedAts[i]);
+        return max;
+    }
+
+    public void collectStats(RowStats.Collector collector)
+    {
+        for (int i = 0; i < size; i++)
+        {
+            collector.updateTimestamp(markedAts[i]);
+            collector.updateLocalDeletionTime(delTimes[i]);
+        }
+    }
+
     public void updateAllTimestamp(long timestamp)
     {
         for (int i = 0; i < size; i++)
@@ -308,22 +325,22 @@ public class RangeTombstoneList implements 
Iterable<RangeTombstone>, IMeasurable
 
     private RangeTombstone rangeTombstone(int idx)
     {
-        return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new 
SimpleDeletionTime(markedAts[idx], delTimes[idx]));
+        return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new 
DeletionTime(markedAts[idx], delTimes[idx]));
     }
 
     private RangeTombstone rangeTombstoneWithNewStart(int idx, Slice.Bound 
newStart)
     {
-        return new RangeTombstone(Slice.make(newStart, ends[idx]), new 
SimpleDeletionTime(markedAts[idx], delTimes[idx]));
+        return new RangeTombstone(Slice.make(newStart, ends[idx]), new 
DeletionTime(markedAts[idx], delTimes[idx]));
     }
 
     private RangeTombstone rangeTombstoneWithNewEnd(int idx, Slice.Bound 
newEnd)
     {
-        return new RangeTombstone(Slice.make(starts[idx], newEnd), new 
SimpleDeletionTime(markedAts[idx], delTimes[idx]));
+        return new RangeTombstone(Slice.make(starts[idx], newEnd), new 
DeletionTime(markedAts[idx], delTimes[idx]));
     }
 
     private RangeTombstone rangeTombstoneWithNewBounds(int idx, Slice.Bound 
newStart, Slice.Bound newEnd)
     {
-        return new RangeTombstone(Slice.make(newStart, newEnd), new 
SimpleDeletionTime(markedAts[idx], delTimes[idx]));
+        return new RangeTombstone(Slice.make(newStart, newEnd), new 
DeletionTime(markedAts[idx], delTimes[idx]));
     }
 
     public Iterator<RangeTombstone> iterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 99547f0..c3f036a 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -17,11 +17,12 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -63,7 +64,7 @@ public abstract class ReadCommand implements ReadQuery
 
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInput in, int version, 
boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, 
ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws 
IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in, int version, 
boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, 
ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws 
IOException;
     }
 
     protected enum Kind
@@ -268,22 +269,13 @@ public abstract class ReadCommand implements ReadQuery
 
         try
         {
-            resultIterator = 
UnfilteredPartitionIterators.convertExpiredCellsToTombstones(resultIterator, 
nowInSec);
-            resultIterator = 
withMetricsRecording(withoutExpiredTombstones(resultIterator, cfs), cfs.metric, 
startTimeNanos);
-
-            // TODO: we should push the dropping of columns down the layers 
because
-            // 1) it'll be more efficient
-            // 2) it could help us solve #6276
-            // But there is not reason not to do this as a followup so keeping 
it here for now (we'll have
-            // to be wary of cached row if we move this down the layers)
-            if (!metadata().getDroppedColumns().isEmpty())
-                resultIterator = 
UnfilteredPartitionIterators.removeDroppedColumns(resultIterator, 
metadata().getDroppedColumns());
+            resultIterator = 
withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), 
cfs.metric, startTimeNanos);
 
             // If we've used a 2ndary index, we know the result already 
satisfy the primary expression used, so
             // no point in checking it again.
             RowFilter updatedFilter = searcher == null
-                                       ? rowFilter()
-                                       : 
rowFilter().without(searcher.primaryClause(this));
+                                    ? rowFilter()
+                                    : 
rowFilter().without(searcher.primaryClause(this));
 
             // TODO: We'll currently do filtering by the rowFilter here 
because it's convenient. However,
             // we'll probably want to optimize by pushing it down the layer 
(like for dropped columns) as it
@@ -333,26 +325,33 @@ public abstract class ReadCommand implements ReadQuery
             {
                 currentKey = iter.partitionKey();
 
-                return new WrappingUnfilteredRowIterator(iter)
+                return new AlteringUnfilteredRowIterator(iter)
                 {
-                    public Unfiltered next()
+                    @Override
+                    protected Row computeNextStatic(Row row)
                     {
-                        Unfiltered unfiltered = super.next();
-                        if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                        {
-                            Row row = (Row) unfiltered;
-                            if (row.hasLiveData(ReadCommand.this.nowInSec()))
-                                ++liveRows;
-                            for (Cell cell : row)
-                                if (!cell.isLive(ReadCommand.this.nowInSec()))
-                                    countTombstone(row.clustering());
-                        }
-                        else
+                        return computeNext(row);
+                    }
+
+                    @Override
+                    protected Row computeNext(Row row)
+                    {
+                        if (row.hasLiveData(ReadCommand.this.nowInSec()))
+                            ++liveRows;
+
+                        for (Cell cell : row.cells())
                         {
-                            countTombstone(unfiltered.clustering());
+                            if (!cell.isLive(ReadCommand.this.nowInSec()))
+                                countTombstone(row.clustering());
                         }
+                        return row;
+                    }
 
-                        return unfiltered;
+                    @Override
+                    protected RangeTombstoneMarker 
computeNext(RangeTombstoneMarker marker)
+                    {
+                        countTombstone(marker.clustering());
+                        return marker;
                     }
 
                     private void countTombstone(ClusteringPrefix clustering)
@@ -407,12 +406,12 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract void appendCQLWhereClause(StringBuilder sb);
 
-    // Skip expired tombstones. We do this because it's safe to do (post-merge 
of the memtable and sstable at least), it
-    // can save us some bandwith, and avoid making us throw a 
TombstoneOverwhelmingException for expired tombstones (which
+    // Skip purgeable tombstones. We do this because it's safe to do 
(post-merge of the memtable and sstable at least), it
+    // can save us some bandwith, and avoid making us throw a 
TombstoneOverwhelmingException for purgeable tombstones (which
     // are to some extend an artefact of compaction lagging behind and hence 
counting them is somewhat unintuitive).
-    protected UnfilteredPartitionIterator 
withoutExpiredTombstones(UnfilteredPartitionIterator iterator, 
ColumnFamilyStore cfs)
+    protected UnfilteredPartitionIterator 
withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, 
ColumnFamilyStore cfs)
     {
-        return new TombstonePurgingPartitionIterator(iterator, 
cfs.gcBefore(nowInSec()))
+        return new PurgingPartitionIterator(iterator, cfs.gcBefore(nowInSec()))
         {
             protected long getMaxPurgeableTimestamp()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableClustering.java 
b/src/java/org/apache/cassandra/db/ReusableClustering.java
deleted file mode 100644
index e2760aa..0000000
--- a/src/java/org/apache/cassandra/db/ReusableClustering.java
+++ /dev/null
@@ -1,82 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.cassandra.utils.ObjectSizes;
-
-public class ReusableClustering extends Clustering
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ReusableClustering(0));
-
-    protected final ByteBuffer[] values;
-
-    protected ReusableWriter writer;
-
-    public ReusableClustering(int size)
-    {
-        this.values = new ByteBuffer[size];
-    }
-
-    public int size()
-    {
-        return values.length;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        return values[i];
-    }
-
-    public ByteBuffer[] getRawValues()
-    {
-        return values;
-    }
-
-    public Writer writer()
-    {
-        if (writer == null)
-            writer = new ReusableWriter();
-        return writer;
-    }
-
-    public void reset()
-    {
-        Arrays.fill(values, null);
-        if (writer != null)
-            writer.reset();
-    }
-
-    protected class ReusableWriter implements Writer
-    {
-        int idx;
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            values[idx++] = value;
-        }
-
-        private void reset()
-        {
-            idx = 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
deleted file mode 100644
index d2f19f7..0000000
--- a/src/java/org/apache/cassandra/db/ReusableClusteringPrefix.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.cassandra.utils.ObjectSizes;
-
-// Note that we abuse a bit ReusableClustering to store Slice.Bound infos, but 
it's convenient so ...
-public class ReusableClusteringPrefix extends ReusableClustering
-{
-    private Kind kind;
-    private int size;
-
-    public ReusableClusteringPrefix(int size)
-    {
-        super(size);
-    }
-
-    public ClusteringPrefix get()
-    {
-        // We use ReusableClusteringPrefix when writing sstables (in 
ColumnIndex) and we
-        // don't write static clustering there.
-        assert kind != Kind.STATIC_CLUSTERING;
-        if (kind == Kind.CLUSTERING)
-        {
-            assert values.length == size;
-            return this;
-        }
-
-        return Slice.Bound.create(kind, Arrays.copyOfRange(values, 0, size));
-    }
-
-    public void copy(ClusteringPrefix clustering)
-    {
-        kind = clustering.kind();
-        for (int i = 0; i < clustering.size(); i++)
-            values[i] = clustering.get(i);
-        size = clustering.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java 
b/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
deleted file mode 100644
index 43530b0..0000000
--- a/src/java/org/apache/cassandra/db/ReusableLivenessInfo.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-public class ReusableLivenessInfo extends AbstractLivenessInfo
-{
-    private long timestamp;
-    private int ttl;
-    private int localDeletionTime;
-
-    public ReusableLivenessInfo()
-    {
-        reset();
-    }
-
-    public LivenessInfo setTo(LivenessInfo info)
-    {
-        return setTo(info.timestamp(), info.ttl(), info.localDeletionTime());
-    }
-
-    public LivenessInfo setTo(long timestamp, int ttl, int localDeletionTime)
-    {
-        this.timestamp = timestamp;
-        this.ttl = ttl;
-        this.localDeletionTime = localDeletionTime;
-        return this;
-    }
-
-    public long timestamp()
-    {
-        return timestamp;
-    }
-
-    public int ttl()
-    {
-        return ttl;
-    }
-
-    public int localDeletionTime()
-    {
-        return localDeletionTime;
-    }
-
-    public void reset()
-    {
-        this.timestamp = LivenessInfo.NO_TIMESTAMP;
-        this.ttl = LivenessInfo.NO_TTL;
-        this.localDeletionTime = LivenessInfo.NO_DELETION_TIME;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java 
b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 627321e..7640512 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -43,25 +43,27 @@ public class RowUpdateBuilder
 {
     private final PartitionUpdate update;
 
-    private final LivenessInfo defaultLiveness;
-    private final LivenessInfo deletionLiveness;
+    private final long timestamp;
+    private final int ttl;
+    private final int localDeletionTime;
+
     private final DeletionTime deletionTime;
 
     private final Mutation mutation;
 
-    private Row.Writer regularWriter;
-    private Row.Writer staticWriter;
+    private Row.Builder regularBuilder;
+    private Row.Builder staticBuilder;
 
-    private boolean hasSetClustering;
     private boolean useRowMarker = true;
 
     private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, 
int localDeletionTime, Mutation mutation)
     {
         this.update = update;
 
-        this.defaultLiveness = SimpleLivenessInfo.forUpdate(timestamp, ttl, 
localDeletionTime, update.metadata());
-        this.deletionLiveness = SimpleLivenessInfo.forDeletion(timestamp, 
localDeletionTime);
-        this.deletionTime = new SimpleDeletionTime(timestamp, 
localDeletionTime);
+        this.timestamp = timestamp;
+        this.ttl = ttl;
+        this.localDeletionTime = localDeletionTime;
+        this.deletionTime = new DeletionTime(timestamp, localDeletionTime);
 
         // note that the created mutation may get further update later on, so 
we don't use the ctor that create a singletonMap
         // underneath (this class if for convenience, not performance)
@@ -73,31 +75,45 @@ public class RowUpdateBuilder
         this(update, timestamp, ttl, FBUtilities.nowInSeconds(), mutation);
     }
 
-    private Row.Writer writer()
+    private void startRow(Clustering clustering)
     {
-        assert staticWriter == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
-        if (regularWriter == null)
-        {
-            regularWriter = update.writer();
+        assert staticBuilder == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
+        assert regularBuilder == null : "Cannot add the clustering twice to 
the same row";
+
+        regularBuilder = 
ArrayBackedRow.unsortedBuilder(update.columns().regulars, 
FBUtilities.nowInSeconds());
+        regularBuilder.newRow(clustering);
+
+        // If a CQL table, add the "row marker"
+        if (update.metadata().isCQLTable() && useRowMarker)
+            
regularBuilder.addPrimaryKeyLivenessInfo(LivenessInfo.create(update.metadata(), 
timestamp, ttl, localDeletionTime));
+    }
 
-            // If a CQL table, add the "row marker"
-            if (update.metadata().isCQLTable() && useRowMarker)
-                regularWriter.writePartitionKeyLivenessInfo(defaultLiveness);
+    private Row.Builder builder()
+    {
+        assert staticBuilder == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
+        if (regularBuilder == null)
+        {
+            // we don't force people to call clustering() if the table has no 
clustering, so call it ourselves
+            assert update.metadata().comparator.size() == 0 : "Missing call to 
clustering()";
+            startRow(Clustering.EMPTY);
         }
-        return regularWriter;
+        return regularBuilder;
     }
 
-    private Row.Writer staticWriter()
+    private Row.Builder staticBuilder()
     {
-        assert regularWriter == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
-        if (staticWriter == null)
-            staticWriter = update.staticWriter();
-        return staticWriter;
+        assert regularBuilder == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
+        if (staticBuilder == null)
+        {
+            staticBuilder = 
ArrayBackedRow.unsortedBuilder(update.columns().statics, 
FBUtilities.nowInSeconds());
+            staticBuilder.newRow(Clustering.STATIC_CLUSTERING);
+        }
+        return staticBuilder;
     }
 
-    private Row.Writer writer(ColumnDefinition c)
+    private Row.Builder builder(ColumnDefinition c)
     {
-        return c.isStatic() ? staticWriter() : writer();
+        return c.isStatic() ? staticBuilder() : builder();
     }
 
     public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object 
partitionKey)
@@ -145,18 +161,17 @@ public class RowUpdateBuilder
     public RowUpdateBuilder clustering(Object... clusteringValues)
     {
         assert clusteringValues.length == update.metadata().comparator.size()
-            : "Invalid clustering values length. Expected: " + 
update.metadata().comparator.size() + " got: " + clusteringValues.length;
-        hasSetClustering = true;
-        if (clusteringValues.length > 0)
-            
Rows.writeClustering(update.metadata().comparator.make(clusteringValues), 
writer());
+             : "Invalid clustering values length. Expected: " + 
update.metadata().comparator.size() + " got: " + clusteringValues.length;
+
+        startRow(clusteringValues.length == 0 ? Clustering.EMPTY : 
update.metadata().comparator.make(clusteringValues));
         return this;
     }
 
     public Mutation build()
     {
-        Row.Writer writer = regularWriter == null ? staticWriter : 
regularWriter;
-        if (writer != null)
-            writer.endOfRow();
+        Row.Builder builder = regularBuilder == null ? staticBuilder : 
regularBuilder;
+        if (builder != null)
+            update.add(builder.build());
         return mutation;
     }
 
@@ -170,14 +185,16 @@ public class RowUpdateBuilder
     {
         assert clusteringValues.length == update.metadata().comparator.size() 
|| (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
 
-        Row.Writer writer = clusteringValues.length == 
update.metadata().comparator.size()
-                          ? update.writer()
-                          : update.staticWriter();
+        boolean isStatic = clusteringValues.length != 
update.metadata().comparator.size();
+        Row.Builder builder = ArrayBackedRow.sortedBuilder(isStatic ? 
update.columns().statics : update.columns().regulars);
 
-        if (clusteringValues.length > 0)
-            
Rows.writeClustering(update.metadata().comparator.make(clusteringValues), 
writer);
-        writer.writeRowDeletion(new SimpleDeletionTime(timestamp, 
FBUtilities.nowInSeconds()));
-        writer.endOfRow();
+        if (isStatic)
+            builder.newRow(Clustering.STATIC_CLUSTERING);
+        else
+            builder.newRow(clusteringValues.length == 0 ? Clustering.EMPTY : 
update.metadata().comparator.make(clusteringValues));
+        builder.addRowDeletion(new DeletionTime(timestamp, 
FBUtilities.nowInSeconds()));
+
+        update.add(builder.build());
     }
 
     public static Mutation deleteRow(CFMetaData metadata, long timestamp, 
Mutation mutation, Object... clusteringValues)
@@ -219,22 +236,21 @@ public class RowUpdateBuilder
     {
         ColumnDefinition c = getDefinition(columnName);
         assert c != null : "Cannot find column " + columnName;
-        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
hasSetClustering : "Cannot set non static column " + c + " since no clustering 
has been provided";
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
regularBuilder != null : "Cannot set non static column " + c + " since no 
clustering has been provided";
         assert c.type.isCollection() && c.type.isMultiCell();
-        writer(c).writeComplexDeletion(c, new 
SimpleDeletionTime(defaultLiveness.timestamp() - 1, 
deletionTime.localDeletionTime()));
+        builder(c).addComplexDeletion(c, new DeletionTime(timestamp - 1, 
localDeletionTime));
         return this;
     }
 
     public RowUpdateBuilder addRangeTombstone(RangeTombstone rt)
     {
-        update.addRangeTombstone(rt);
+        update.add(rt);
         return this;
     }
 
     public RowUpdateBuilder addRangeTombstone(Slice slice)
     {
-        update.addRangeTombstone(slice, deletionTime);
-        return this;
+        return addRangeTombstone(new RangeTombstone(slice, deletionTime));
     }
 
     public RowUpdateBuilder addRangeTombstone(Object start, Object end)
@@ -251,13 +267,17 @@ public class RowUpdateBuilder
         return add(c, value);
     }
 
+    private Cell makeCell(ColumnDefinition c, ByteBuffer value, CellPath path)
+    {
+        return value == null
+             ? BufferCell.tombstone(c, timestamp, localDeletionTime)
+             : (ttl == LivenessInfo.NO_TTL ? 
BufferCell.live(update.metadata(), c, timestamp, value, path) : 
BufferCell.expiring(c, timestamp, ttl, localDeletionTime, value, path));
+    }
+
     public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object 
value)
     {
-        assert columnDefinition.isStatic() || 
update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non 
static column " + columnDefinition + " since no clustering hasn't been 
provided";
-        if (value == null)
-            writer(columnDefinition).writeCell(columnDefinition, false, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, deletionLiveness, null);
-        else
-            writer(columnDefinition).writeCell(columnDefinition, false, 
bb(value, columnDefinition.type), defaultLiveness, null);
+        assert columnDefinition.isStatic() || 
update.metadata().comparator.size() == 0 || regularBuilder != null : "Cannot 
set non static column " + columnDefinition + " since no clustering hasn't been 
provided";
+        builder(columnDefinition).addCell(makeCell(columnDefinition, bb(value, 
columnDefinition.type), null));
         return this;
     }
 
@@ -273,8 +293,11 @@ public class RowUpdateBuilder
         return add(columnDefinition, null);
     }
 
-    private ByteBuffer bb(Object value, AbstractType<?> type)
+    private static ByteBuffer bb(Object value, AbstractType<?> type)
     {
+        if (value == null)
+            return null;
+
         if (value instanceof ByteBuffer)
             return (ByteBuffer)value;
 
@@ -306,30 +329,30 @@ public class RowUpdateBuilder
     public RowUpdateBuilder addMapEntry(String columnName, Object key, Object 
value)
     {
         ColumnDefinition c = getDefinition(columnName);
-        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
hasSetClustering : "Cannot set non static column " + c + " since no clustering 
has been provided";
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
regularBuilder != null : "Cannot set non static column " + c + " since no 
clustering has been provided";
         assert c.type instanceof MapType && c.type.isMultiCell();
         MapType mt = (MapType)c.type;
-        writer(c).writeCell(c, false, bb(value, mt.getValuesType()), 
defaultLiveness, CellPath.create(bb(key, mt.getKeysType())));
+        builder(c).addCell(makeCell(c, bb(value, mt.getValuesType()), 
CellPath.create(bb(key, mt.getKeysType()))));
         return this;
     }
 
     public RowUpdateBuilder addListEntry(String columnName, Object value)
     {
         ColumnDefinition c = getDefinition(columnName);
-        assert c.isStatic() || hasSetClustering : "Cannot set non static 
column " + c + " since no clustering has been provided";
+        assert c.isStatic() || regularBuilder != null : "Cannot set non static 
column " + c + " since no clustering has been provided";
         assert c.type instanceof ListType && c.type.isMultiCell();
         ListType lt = (ListType)c.type;
-        writer(c).writeCell(c, false, bb(value, lt.getElementsType()), 
defaultLiveness, CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
+        builder(c).addCell(makeCell(c, bb(value, lt.getElementsType()), 
CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()))));
         return this;
     }
 
     public RowUpdateBuilder addSetEntry(String columnName, Object value)
     {
         ColumnDefinition c = getDefinition(columnName);
-        assert c.isStatic() || hasSetClustering : "Cannot set non static 
column " + c + " since no clustering has been provided";
+        assert c.isStatic() || regularBuilder != null : "Cannot set non static 
column " + c + " since no clustering has been provided";
         assert c.type instanceof SetType && c.type.isMultiCell();
         SetType st = (SetType)c.type;
-        writer(c).writeCell(c, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
defaultLiveness, CellPath.create(bb(value, st.getElementsType())));
+        builder(c).addCell(makeCell(c, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
CellPath.create(bb(value, st.getElementsType()))));
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index c720804..5784260 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -97,7 +96,7 @@ public class SerializationHeader
         // We always use a dense layout for the static row. Having very many 
static columns with  only a few set at
         // any given time doesn't feel very common at all (and we already 
optimize the case where no static at all
         // are provided).
-        return isStatic ? false : useSparseColumnLayout;
+        return !isStatic && useSparseColumnLayout;
     }
 
     public static SerializationHeader forKeyCache(CFMetaData metadata)
@@ -159,13 +158,7 @@ public class SerializationHeader
 
     private static List<AbstractType<?>> typesOf(List<ColumnDefinition> 
columns)
     {
-        return ImmutableList.copyOf(Lists.transform(columns, new 
Function<ColumnDefinition, AbstractType<?>>()
-        {
-            public AbstractType<?> apply(ColumnDefinition column)
-            {
-                return column.type;
-            }
-        }));
+        return ImmutableList.copyOf(Lists.transform(columns, column -> 
column.type));
     }
 
     public PartitionColumns columns()
@@ -365,7 +358,7 @@ public class SerializationHeader
             Columns.serializer.serialize(header.columns.regulars, out);
         }
 
-        public SerializationHeader deserializeForMessaging(DataInput in, 
CFMetaData metadata, boolean hasStatic) throws IOException
+        public SerializationHeader deserializeForMessaging(DataInputPlus in, 
CFMetaData metadata, boolean hasStatic) throws IOException
         {
             RowStats stats = RowStats.serializer.deserialize(in);
 
@@ -458,7 +451,7 @@ public class SerializationHeader
             return size;
         }
 
-        private void readColumnsWithType(DataInput in, Map<ByteBuffer, 
AbstractType<?>> typeMap) throws IOException
+        private void readColumnsWithType(DataInputPlus in, Map<ByteBuffer, 
AbstractType<?>> typeMap) throws IOException
         {
             int length = in.readUnsignedShort();
             for (int i = 0; i < length; i++)
@@ -474,7 +467,7 @@ public class SerializationHeader
             
ByteBufferUtil.writeWithLength(UTF8Type.instance.decompose(type.toString()), 
out);
         }
 
-        private AbstractType<?> readType(DataInput in) throws IOException
+        private AbstractType<?> readType(DataInputPlus in) throws IOException
         {
             ByteBuffer raw = ByteBufferUtil.readWithLength(in);
             return TypeParser.parse(UTF8Type.instance.compose(raw));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleClustering.java 
b/src/java/org/apache/cassandra/db/SimpleClustering.java
deleted file mode 100644
index 8b1cb7b..0000000
--- a/src/java/org/apache/cassandra/db/SimpleClustering.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.ObjectSizes;
-
-public class SimpleClustering extends Clustering
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
SimpleClustering(new ByteBuffer[0]));
-
-    private final ByteBuffer[] values;
-
-    public SimpleClustering(ByteBuffer... values)
-    {
-        this.values = values;
-    }
-
-    public SimpleClustering(ByteBuffer value)
-    {
-        this(new ByteBuffer[]{ value });
-    }
-
-    public int size()
-    {
-        return values.length;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        return values[i];
-    }
-
-    public ByteBuffer[] getRawValues()
-    {
-        return values;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
-    }
-
-    @Override
-    public Clustering takeAlias()
-    {
-        return this;
-    }
-
-    public static Builder builder(int size)
-    {
-        return new Builder(size);
-    }
-
-    public static class Builder implements Writer
-    {
-        private final ByteBuffer[] values;
-        private int idx;
-
-        private Builder(int size)
-        {
-            this.values = new ByteBuffer[size];
-        }
-
-        public void writeClusteringValue(ByteBuffer value)
-        {
-            values[idx++] = value;
-        }
-
-        public SimpleClustering build()
-        {
-            assert idx == values.length;
-            return new SimpleClustering(values);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java 
b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
deleted file mode 100644
index 738c5e6..0000000
--- a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-
-import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Simple implementation of DeletionTime.
- */
-public class SimpleDeletionTime extends DeletionTime
-{
-    public final long markedForDeleteAt;
-    public final int localDeletionTime;
-
-    @VisibleForTesting
-    public SimpleDeletionTime(long markedForDeleteAt, int localDeletionTime)
-    {
-        this.markedForDeleteAt = markedForDeleteAt;
-        this.localDeletionTime = localDeletionTime;
-    }
-
-    public long markedForDeleteAt()
-    {
-        return markedForDeleteAt;
-    }
-
-    public int localDeletionTime()
-    {
-        return localDeletionTime;
-    }
-
-    public DeletionTime takeAlias()
-    {
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java 
b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
deleted file mode 100644
index fea1b86..0000000
--- a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Objects;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class SimpleLivenessInfo extends AbstractLivenessInfo
-{
-    private final long timestamp;
-    private final int ttl;
-    private final int localDeletionTime;
-
-    // Note that while some code use this ctor, the two following static 
creation methods
-    // are usually less error prone.
-    SimpleLivenessInfo(long timestamp, int ttl, int localDeletionTime)
-    {
-        this.timestamp = timestamp;
-        this.ttl = ttl;
-        this.localDeletionTime = localDeletionTime;
-    }
-
-    public static SimpleLivenessInfo forUpdate(long timestamp, int ttl, int 
nowInSec, CFMetaData metadata)
-    {
-        if (ttl == NO_TTL)
-            ttl = metadata.getDefaultTimeToLive();
-
-        return new SimpleLivenessInfo(timestamp, ttl, ttl == NO_TTL ? 
NO_DELETION_TIME : nowInSec + ttl);
-    }
-
-    public static SimpleLivenessInfo forDeletion(long timestamp, int 
localDeletionTime)
-    {
-        return new SimpleLivenessInfo(timestamp, NO_TTL, localDeletionTime);
-    }
-
-    public long timestamp()
-    {
-        return timestamp;
-    }
-
-    public int ttl()
-    {
-        return ttl;
-    }
-
-    public int localDeletionTime()
-    {
-        return localDeletionTime;
-    }
-
-    @Override
-    public LivenessInfo takeAlias()
-    {
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
index cb43cd3..53ead14 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -146,7 +146,7 @@ public class SinglePartitionNamesCommand extends 
SinglePartitionReadCommand<Clus
 
             try (UnfilteredRowIterator iter = 
result.unfilteredIterator(columnFilter(), Slices.ALL, false))
             {
-                final Mutation mutation = new 
Mutation(UnfilteredRowIterators.toUpdate(iter));
+                final Mutation mutation = new 
Mutation(PartitionUpdate.fromIterator(iter));
                 StageManager.getStage(Stage.MUTATION).execute(new Runnable()
                 {
                     public void run()
@@ -232,7 +232,7 @@ public class SinglePartitionNamesCommand extends 
SinglePartitionReadCommand<Clus
         // We can remove a row if it has data that is more recent that the 
next sstable to consider for the data that the query
         // cares about. And the data we care about is 1) the row timestamp 
(since every query cares if the row exists or not)
         // and 2) the requested columns.
-        if (!row.primaryKeyLivenessInfo().hasTimestamp() || 
row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+        if (row.primaryKeyLivenessInfo().isEmpty() || 
row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
             return false;
 
         for (ColumnDefinition column : requestedColumns)
@@ -242,7 +242,7 @@ public class SinglePartitionNamesCommand extends 
SinglePartitionReadCommand<Clus
                 return false;
 
             Cell cell = row.getCell(column);
-            if (cell == null || cell.livenessInfo().timestamp() <= 
sstableTimestamp)
+            if (cell == null || cell.timestamp() <= sstableTimestamp)
                 return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index f9f583f..80711d6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
 
@@ -29,6 +28,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.service.*;
@@ -483,7 +483,7 @@ public abstract class SinglePartitionReadCommand<F extends 
ClusteringIndexFilter
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInput in, int version, boolean 
isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean 
isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
         throws IOException
         {
             DecoratedKey key = 
StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in));

Reply via email to