http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java 
b/src/java/org/apache/cassandra/db/Clustering.java
index 541556b..7754182 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -25,7 +24,9 @@ import java.util.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * The clustering column values for a row.
@@ -39,7 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * all of the following ones will be too because that's what thrift allows, 
but it's never assumed by the
  * code so we could start generally allowing nulls for clustering columns if 
we wanted to).
  */
-public abstract class Clustering extends AbstractClusteringPrefix
+public class Clustering extends AbstractClusteringPrefix
 {
     public static final Serializer serializer = new Serializer();
 
@@ -47,7 +48,7 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
      * The special cased clustering used by all static rows. It is a special 
case in the
      * sense that it's always empty, no matter how many clustering columns the 
table has.
      */
-    public static final Clustering STATIC_CLUSTERING = new EmptyClustering()
+    public static final Clustering STATIC_CLUSTERING = new 
Clustering(EMPTY_VALUES_ARRAY)
     {
         @Override
         public Kind kind()
@@ -63,19 +64,35 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
     };
 
     /** Empty clustering for tables having no clustering columns. */
-    public static final Clustering EMPTY = new EmptyClustering();
+    public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY)
+    {
+        @Override
+        public String toString(CFMetaData metadata)
+        {
+            return "EMPTY";
+        }
+    };
+
+    public Clustering(ByteBuffer... values)
+    {
+        super(Kind.CLUSTERING, values);
+    }
 
     public Kind kind()
     {
         return Kind.CLUSTERING;
     }
 
-    public Clustering takeAlias()
+    public Clustering copy(AbstractAllocator allocator)
     {
-        ByteBuffer[] values = new ByteBuffer[size()];
+        // Important for STATIC_CLUSTERING (but no point in being wasteful in 
general).
+        if (size() == 0)
+            return this;
+
+        ByteBuffer[] newValues = new ByteBuffer[size()];
         for (int i = 0; i < size(); i++)
-            values[i] = get(i);
-        return new SimpleClustering(values);
+            newValues[i] = values[i] == null ? null : 
allocator.clone(values[i]);
+        return new Clustering(newValues);
     }
 
     public String toString(CFMetaData metadata)
@@ -84,7 +101,7 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
         for (int i = 0; i < size(); i++)
         {
             ColumnDefinition c = metadata.clusteringColumns().get(i);
-            sb.append(i == 0 ? "" : ", 
").append(c.name).append("=").append(get(i) == null ? "null" : 
c.type.getString(get(i)));
+            sb.append(i == 0 ? "" : ", 
").append(c.name).append('=').append(get(i) == null ? "null" : 
c.type.getString(get(i)));
         }
         return sb.toString();
     }
@@ -100,44 +117,6 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
         return sb.toString();
     }
 
-    private static class EmptyClustering extends Clustering
-    {
-        private static final ByteBuffer[] EMPTY_VALUES_ARRAY = new 
ByteBuffer[0];
-
-        public int size()
-        {
-            return 0;
-        }
-
-        public ByteBuffer get(int i)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public ByteBuffer[] getRawValues()
-        {
-            return EMPTY_VALUES_ARRAY;
-        }
-
-        @Override
-        public Clustering takeAlias()
-        {
-            return this;
-        }
-
-        @Override
-        public long unsharedHeapSize()
-        {
-            return 0;
-        }
-
-        @Override
-        public String toString(CFMetaData metadata)
-        {
-            return "EMPTY";
-        }
-    }
-
     /**
      * Serializer for Clustering object.
      * <p>
@@ -148,6 +127,7 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
     {
         public void serialize(Clustering clustering, DataOutputPlus out, int 
version, List<AbstractType<?>> types) throws IOException
         {
+            assert clustering != STATIC_CLUSTERING : "We should never 
serialize a static clustering";
             ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, 
out, version, types);
         }
 
@@ -156,16 +136,13 @@ public abstract class Clustering extends 
AbstractClusteringPrefix
             return 
ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, 
version, types);
         }
 
-        public void deserialize(DataInput in, int version, 
List<AbstractType<?>> types, Writer writer) throws IOException
+        public Clustering deserialize(DataInputPlus in, int version, 
List<AbstractType<?>> types) throws IOException
         {
-            ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, 
types.size(), version, types, writer);
-        }
+            if (types.isEmpty())
+                return EMPTY;
 
-        public Clustering deserialize(DataInput in, int version, 
List<AbstractType<?>> types) throws IOException
-        {
-            SimpleClustering.Builder builder = 
SimpleClustering.builder(types.size());
-            deserialize(in, version, types, builder);
-            return builder.build();
+            ByteBuffer[] values = 
ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), 
version, types);
+            return new Clustering(values);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java 
b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index 8b01d6f..a5401f0 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -25,7 +25,9 @@ import java.util.Objects;
 
 import com.google.common.base.Joiner;
 
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
@@ -46,9 +48,11 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
     private final Comparator<IndexInfo> indexReverseComparator;
     private final Comparator<Clusterable> reverseComparator;
 
+    private final Comparator<Row> rowComparator = (r1, r2) -> 
compare(r1.clustering(), r2.clustering());
+
     public ClusteringComparator(AbstractType<?>... clusteringTypes)
     {
-        this(Arrays.<AbstractType<?>>asList(clusteringTypes));
+        this(Arrays.asList(clusteringTypes));
     }
 
     public ClusteringComparator(List<AbstractType<?>> clusteringTypes)
@@ -56,27 +60,9 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
         this.clusteringTypes = clusteringTypes;
         this.isByteOrderComparable = isByteOrderComparable(clusteringTypes);
 
-        this.indexComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return ClusteringComparator.this.compare(o1.lastName, 
o2.lastName);
-            }
-        };
-        this.indexReverseComparator = new Comparator<IndexInfo>()
-        {
-            public int compare(IndexInfo o1, IndexInfo o2)
-            {
-                return ClusteringComparator.this.compare(o1.firstName, 
o2.firstName);
-            }
-        };
-        this.reverseComparator = new Comparator<Clusterable>()
-        {
-            public int compare(Clusterable c1, Clusterable c2)
-            {
-                return ClusteringComparator.this.compare(c2, c1);
-            }
-        };
+        this.indexComparator = (o1, o2) -> 
ClusteringComparator.this.compare(o1.lastName, o2.lastName);
+        this.indexReverseComparator = (o1, o2) -> 
ClusteringComparator.this.compare(o1.firstName, o2.firstName);
+        this.reverseComparator = (c1, c2) -> 
ClusteringComparator.this.compare(c2, c1);
     }
 
     private static boolean isByteOrderComparable(Iterable<AbstractType<?>> 
types)
@@ -130,11 +116,10 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
             throw new IllegalArgumentException(String.format("Invalid number 
of components, expecting %d but got %d", size(), values.length));
 
         CBuilder builder = CBuilder.create(this);
-        for (int i = 0; i < values.length; i++)
+        for (Object val : values)
         {
-            Object val = values[i];
             if (val instanceof ByteBuffer)
-                builder.add((ByteBuffer)val);
+                builder.add((ByteBuffer) val);
             else
                 builder.add(val);
         }
@@ -179,7 +164,7 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
     public int compareComponent(int i, ByteBuffer v1, ByteBuffer v2)
     {
         if (v1 == null)
-            return v1 == null ? 0 : -1;
+            return v2 == null ? 0 : -1;
         if (v2 == null)
             return 1;
 
@@ -233,6 +218,19 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
         }
     }
 
+    /**
+     * A comparator for rows.
+     *
+     * A {@code Row} is a {@code Clusterable} so {@code ClusteringComparator} 
can be used
+     * to compare rows directly, but when we know we deal with rows (and not 
{@code Clusterable} in
+     * general), this is a little faster because by knowing we compare {@code 
Clustering} objects,
+     * we know that 1) they all have the same size and 2) they all have the 
same kind.
+     */
+    public Comparator<Row> rowComparator()
+    {
+        return rowComparator;
+    }
+
     public Comparator<IndexInfo> indexComparator(boolean reversed)
     {
         return reversed ? indexReverseComparator : indexComparator;
@@ -243,27 +241,6 @@ public class ClusteringComparator implements 
Comparator<Clusterable>
         return reverseComparator;
     }
 
-    /**
-     * Whether the two provided clustering prefix are on the same clustering 
values.
-     *
-     * @param c1 the first prefix.
-     * @param c2 the second prefix.
-     * @return whether {@code c1} and {@code c2} have the same clustering 
values (but not necessarily
-     * the same "kind") or not.
-     */
-    public boolean isOnSameClustering(ClusteringPrefix c1, ClusteringPrefix c2)
-    {
-        if (c1.size() != c2.size())
-            return false;
-
-        for (int i = 0; i < c1.size(); i++)
-        {
-            if (compareComponent(i, c1.get(i), c2.get(i)) != 0)
-                return false;
-        }
-        return true;
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 3bc7ff8..7b9d582 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
@@ -27,29 +26,31 @@ import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
- * A clustering prefix is basically the unit of what a {@link 
ClusteringComparator} can compare.
+ * A clustering prefix is the unit of what a {@link ClusteringComparator} can 
compare.
  * <p>
- * It holds values for the clustering columns of a table (potentially only a 
prefix of all of them) and it has
+ * It holds values for the clustering columns of a table (potentially only a 
prefix of all of them) and has
  * a "kind" that allows us to implement slices with inclusive and exclusive 
bounds.
  * <p>
- * In practice, {@code ClusteringPrefix} is just the common parts to its 2 
main subtype: {@link Clustering} and
- * {@link Slice.Bound}, where:
+ * In practice, {@code ClusteringPrefix} is just the common parts to its 3 
main subtype: {@link Clustering} and
+ * {@link Slice.Bound}/{@link RangeTombstone.Bound}, where:
  *   1) {@code Clustering} represents the clustering values for a row, i.e. 
the values for it's clustering columns.
  *   2) {@code Slice.Bound} represents a bound (start or end) of a slice (of 
rows).
+ *   3) {@code RangeTombstoneBoundMarker.Bound} represents a range tombstone 
marker "bound".
  * See those classes for more details.
  */
-public interface ClusteringPrefix extends Aliasable<ClusteringPrefix>, 
IMeasurableMemory, Clusterable
+public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
 {
     public static final Serializer serializer = new Serializer();
 
     /**
      * The kind of clustering prefix this actually is.
      *
-     * The kind {@code STATIC_CLUSTERING} is only implemented by {@link 
Clustering.STATIC_CLUSTERING} and {@code CLUSTERING} is
+     * The kind {@code STATIC_CLUSTERING} is only implemented by {@link 
Clustering#STATIC_CLUSTERING} and {@code CLUSTERING} is
      * implemented by the {@link Clustering} class. The rest is used by {@link 
Slice.Bound} and {@link RangeTombstone.Bound}.
      */
     public enum Kind
@@ -166,12 +167,12 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
 
         public boolean isOpen(boolean reversed)
         {
-            return reversed ? isEnd() : isStart();
+            return isBoundary() || (reversed ? isEnd() : isStart());
         }
 
         public boolean isClose(boolean reversed)
         {
-            return reversed ? isStart() : isEnd();
+            return isBoundary() || (reversed ? isStart() : isEnd());
         }
 
         public Kind closeBoundOfBoundary(boolean reversed)
@@ -211,15 +212,29 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
      */
     public ByteBuffer get(int i);
 
+    /**
+     * Adds the data of this clustering prefix to the provided digest.
+     *
+     * @param digest the digest to which to add this prefix.
+     */
     public void digest(MessageDigest digest);
 
-    // Used to verify if batches goes over a given size
+    /**
+     * The size of the data hold by this prefix.
+     *
+     * @return the size of the data hold by this prefix (this is not the size 
of the object in memory, just
+     * the size of the data it stores).
+     */
     public int dataSize();
 
+    /**
+     * Generates a proper string representation of the prefix.
+     *
+     * @param metadata the metadata for the table the clustering prefix is of.
+     * @return a human-readable string representation fo this prefix.
+     */
     public String toString(CFMetaData metadata);
 
-    public void writeTo(Writer writer);
-
     /**
      * The values of this prefix as an array.
      * <p>
@@ -231,21 +246,6 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
      */
     public ByteBuffer[] getRawValues();
 
-    /**
-     * Interface for writing a clustering prefix.
-     * <p>
-     * Each value for the prefix should simply be written in order.
-     */
-    public interface Writer
-    {
-        /**
-         * Write the next value to the writer.
-         *
-         * @param value the value to write.
-         */
-        public void writeClusteringValue(ByteBuffer value);
-    }
-
     public static class Serializer
     {
         public void serialize(ClusteringPrefix clustering, DataOutputPlus out, 
int version, List<AbstractType<?>> types) throws IOException
@@ -263,7 +263,7 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
             }
         }
 
-        public ClusteringPrefix deserialize(DataInput in, int version, 
List<AbstractType<?>> types) throws IOException
+        public ClusteringPrefix deserialize(DataInputPlus in, int version, 
List<AbstractType<?>> types) throws IOException
         {
             Kind kind = Kind.values()[in.readByte()];
             // We shouldn't serialize static clusterings
@@ -317,21 +317,20 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
             return size;
         }
 
-        void deserializeValuesWithoutSize(DataInput in, int size, int version, 
List<AbstractType<?>> types, ClusteringPrefix.Writer writer) throws IOException
+        ByteBuffer[] deserializeValuesWithoutSize(DataInputPlus in, int size, 
int version, List<AbstractType<?>> types) throws IOException
         {
-            if (size == 0)
-                return;
+            // Callers of this method should handle the case where size = 0 
(in all case we want to return a special value anyway).
+            assert size > 0;
 
+            ByteBuffer[] values = new ByteBuffer[size];
             int[] header = readHeader(size, in);
             for (int i = 0; i < size; i++)
             {
-                if (isNull(header, i))
-                    writer.writeClusteringValue(null);
-                else if (isEmpty(header, i))
-                    
writer.writeClusteringValue(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-                else
-                    writer.writeClusteringValue(types.get(i).readValue(in));
+                values[i] = isNull(header, i)
+                          ? null
+                          : (isEmpty(header, i) ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : types.get(i).readValue(in));
             }
+            return values;
         }
 
         private int headerBytesCount(int size)
@@ -369,7 +368,7 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
             }
         }
 
-        private int[] readHeader(int size, DataInput in) throws IOException
+        private int[] readHeader(int size, DataInputPlus in) throws IOException
         {
             int nbBytes = headerBytesCount(size);
             int[] header = new int[nbBytes];
@@ -378,14 +377,14 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
             return header;
         }
 
-        private boolean isNull(int[] header, int i)
+        private static boolean isNull(int[] header, int i)
         {
             int b = header[i / 4];
             int mask = 1 << ((i % 4) * 2) + 1;
             return (b & mask) != 0;
         }
 
-        private boolean isEmpty(int[] header, int i)
+        private static boolean isEmpty(int[] header, int i)
         {
             int b = header[i / 4];
             int mask = 1 << ((i % 4) * 2);
@@ -405,7 +404,7 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
     public static class Deserializer
     {
         private final ClusteringComparator comparator;
-        private final DataInput in;
+        private final DataInputPlus in;
         private final SerializationHeader serializationHeader;
 
         private boolean nextIsRow;
@@ -414,14 +413,13 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
         private int nextSize;
         private ClusteringPrefix.Kind nextKind;
         private int deserializedSize;
-        private final ByteBuffer[] nextValues;
+        private ByteBuffer[] nextValues;
 
-        public Deserializer(ClusteringComparator comparator, DataInput in, 
SerializationHeader header)
+        public Deserializer(ClusteringComparator comparator, DataInputPlus in, 
SerializationHeader header)
         {
             this.comparator = comparator;
             this.in = in;
             this.serializationHeader = header;
-            this.nextValues = new ByteBuffer[comparator.size()];
         }
 
         public void prepare(int flags) throws IOException
@@ -432,6 +430,14 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
             this.nextSize = nextIsRow ? comparator.size() : 
in.readUnsignedShort();
             this.nextHeader = serializer.readHeader(nextSize, in);
             this.deserializedSize = 0;
+
+            // The point of the deserializer is that some of the clustering 
prefix won't actually be used (because they are not
+            // within the bounds of the query), and we want to reduce 
allocation for them. So we only reuse the values array
+            // between elements if 1) we haven't returned the previous element 
(if we have, nextValues will be null) and 2)
+            // nextValues is of the proper size. Note that the 2nd condition 
may not hold for range tombstone bounds, but all
+            // rows have a fixed size clustering, so we'll still save in the 
common case.
+            if (nextValues == null || nextValues.length != nextSize)
+                this.nextValues = new ByteBuffer[nextSize];
         }
 
         public int compareNextTo(Slice.Bound bound) throws IOException
@@ -473,9 +479,9 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
                 return false;
 
             int i = deserializedSize++;
-            nextValues[i] = serializer.isNull(nextHeader, i)
+            nextValues[i] = Serializer.isNull(nextHeader, i)
                           ? null
-                          : (serializer.isEmpty(nextHeader, i) ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : 
serializationHeader.clusteringTypes().get(i).readValue(in));
+                          : (Serializer.isEmpty(nextHeader, i) ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : 
serializationHeader.clusteringTypes().get(i).readValue(in));
             return true;
         }
 
@@ -485,29 +491,31 @@ public interface ClusteringPrefix extends 
Aliasable<ClusteringPrefix>, IMeasurab
                 continue;
         }
 
-        public RangeTombstone.Bound.Kind 
deserializeNextBound(RangeTombstone.Bound.Writer writer) throws IOException
+        public RangeTombstone.Bound deserializeNextBound() throws IOException
         {
             assert !nextIsRow;
             deserializeAll();
-            for (int i = 0; i < nextSize; i++)
-                writer.writeClusteringValue(nextValues[i]);
-            writer.writeBoundKind(nextKind);
-            return nextKind;
+            RangeTombstone.Bound bound = new RangeTombstone.Bound(nextKind, 
nextValues);
+            nextValues = null;
+            return bound;
         }
 
-        public void deserializeNextClustering(Clustering.Writer writer) throws 
IOException
+        public Clustering deserializeNextClustering() throws IOException
         {
-            assert nextIsRow && nextSize == nextValues.length;
+            assert nextIsRow;
             deserializeAll();
-            for (int i = 0; i < nextSize; i++)
-                writer.writeClusteringValue(nextValues[i]);
+            Clustering clustering = new Clustering(nextValues);
+            nextValues = null;
+            return clustering;
         }
 
         public ClusteringPrefix.Kind skipNext() throws IOException
         {
             for (int i = deserializedSize; i < nextSize; i++)
-                if (!serializer.isNull(nextHeader, i) && 
!serializer.isEmpty(nextHeader, i))
+            {
+                if (!Serializer.isNull(nextHeader, i) && 
!Serializer.isEmpty(nextHeader, i))
                     serializationHeader.clusteringTypes().get(i).skipValue(in);
+            }
             return nextKind;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 1a9b92d..52fc48f 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -74,7 +74,7 @@ public class ColumnIndex
         private int written;
 
         private ClusteringPrefix firstClustering;
-        private final ReusableClusteringPrefix lastClustering;
+        private ClusteringPrefix lastClustering;
 
         private DeletionTime openMarker;
 
@@ -90,7 +90,6 @@ public class ColumnIndex
 
             this.result = new ColumnIndex(new 
ArrayList<IndexHelper.IndexInfo>());
             this.initialPosition = writer.getFilePointer();
-            this.lastClustering = new 
ReusableClusteringPrefix(iterator.metadata().clusteringColumns().size());
         }
 
         private void writePartitionHeader(UnfilteredRowIterator iterator) 
throws IOException
@@ -119,7 +118,7 @@ public class ColumnIndex
         private void addIndexBlock()
         {
             IndexHelper.IndexInfo cIndexInfo = new 
IndexHelper.IndexInfo(firstClustering,
-                                                                         
lastClustering.get().takeAlias(),
+                                                                         
lastClustering,
                                                                          
startPosition,
                                                                          
currentPosition() - startPosition,
                                                                          
openMarker);
@@ -129,28 +128,27 @@ public class ColumnIndex
 
         private void add(Unfiltered unfiltered) throws IOException
         {
-            lastClustering.copy(unfiltered.clustering());
-            boolean isMarker = unfiltered.kind() == 
Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
-
             if (firstClustering == null)
             {
                 // Beginning of an index block. Remember the start and position
-                firstClustering = lastClustering.get().takeAlias();
+                firstClustering = unfiltered.clustering();
                 startPosition = currentPosition();
             }
 
             UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer.stream, version);
+            lastClustering = unfiltered.clustering();
             ++written;
 
-            if (isMarker)
+            if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
             {
-                RangeTombstoneMarker marker = (RangeTombstoneMarker) 
unfiltered;
+                RangeTombstoneMarker marker = (RangeTombstoneMarker)unfiltered;
                 openMarker = marker.isOpen(false) ? 
marker.openDeletionTime(false) : null;
             }
 
             // if we hit the column index size that we have to index after, go 
ahead and index it.
             if (currentPosition() - startPosition >= 
DatabaseDescriptor.getColumnIndexSize())
                 addIndexBlock();
+
         }
 
         private ColumnIndex close() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java 
b/src/java/org/apache/cassandra/db/Columns.java
index 055624b..48a4504 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.*;
+import java.util.function.Predicate;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 
@@ -329,9 +330,9 @@ public class Columns implements Iterable<ColumnDefinition>
     }
 
     /**
-     * Whether this object is a subset of the provided other {@code Columns 
object}.
+     * Whether this object is a superset of the provided other {@code Columns 
object}.
      *
-     * @param other the othere object to test for inclusion in this object.
+     * @param other the other object to test for inclusion in this object.
      *
      * @return whether all the columns of {@code other} are contained by this 
object.
      */
@@ -439,6 +440,34 @@ public class Columns implements Iterable<ColumnDefinition>
         return new Columns(newColumns);
     }
 
+    /**
+     * Returns a predicate to test whether columns are included in this {@code 
Columns} object,
+     * assuming that tes tested columns are passed to the predicate in sorted 
order.
+     *
+     * @return a predicate to test the inclusion of sorted columns in this 
object.
+     */
+    public Predicate<ColumnDefinition> inOrderInclusionTester()
+    {
+        return new Predicate<ColumnDefinition>()
+        {
+            private int i = 0;
+
+            public boolean test(ColumnDefinition column)
+            {
+                while (i < columns.length)
+                {
+                    int cmp = column.compareTo(columns[i]);
+                    if (cmp < 0)
+                        return false;
+                    i++;
+                    if (cmp == 0)
+                        return true;
+                }
+                return false;
+            }
+        };
+    }
+
     public void digest(MessageDigest digest)
     {
         for (ColumnDefinition c : this)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java 
b/src/java/org/apache/cassandra/db/CounterMutation.java
index d1830a0..6818513 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -155,7 +155,7 @@ public class CounterMutation implements IMutation
     /**
      * Returns a wrapper for the Striped#bulkGet() call (via 
Keyspace#counterLocksFor())
      * Striped#bulkGet() depends on Object#hashCode(), so here we make sure 
that the cf id and the partition key
-     * all get to be part of the hashCode() calculation, not just the cell 
name.
+     * all get to be part of the hashCode() calculation.
      */
     private Iterable<Object> getCounterLockKeys()
     {
@@ -167,11 +167,11 @@ public class CounterMutation implements IMutation
                 {
                     public Iterable<Object> apply(final Row row)
                     {
-                        return Iterables.concat(Iterables.transform(row, new 
Function<Cell, Object>()
+                        return Iterables.concat(Iterables.transform(row, new 
Function<ColumnData, Object>()
                         {
-                            public Object apply(final Cell cell)
+                            public Object apply(final ColumnData data)
                             {
-                                return 
Objects.hashCode(update.metadata().cfId, key(), row.clustering(), 
cell.column(), cell.path());
+                                return 
Objects.hashCode(update.metadata().cfId, key(), row.clustering(), 
data.column());
                             }
                         }));
                     }
@@ -238,7 +238,7 @@ public class CounterMutation implements IMutation
         BTreeSet.Builder<Clustering> names = 
BTreeSet.builder(cfs.metadata.comparator);
         for (PartitionUpdate.CounterMark mark : marks)
         {
-            names.add(mark.clustering().takeAlias());
+            names.add(mark.clustering());
             if (mark.path() == null)
                 builder.add(mark.column());
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java 
b/src/java/org/apache/cassandra/db/DataRange.java
index 0d7a762..358b0ac 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -16,7 +16,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -26,6 +25,7 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 
@@ -372,7 +372,7 @@ public class DataRange
             }
         }
 
-        public DataRange deserialize(DataInput in, int version, CFMetaData 
metadata) throws IOException
+        public DataRange deserialize(DataInputPlus in, int version, CFMetaData 
metadata) throws IOException
         {
             AbstractBounds<PartitionPosition> range = 
AbstractBounds.rowPositionSerializer.deserialize(in, 
MessagingService.globalPartitioner(), version);
             ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java 
b/src/java/org/apache/cassandra/db/DeletionInfo.java
index a441c48..0b5df06 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -19,255 +19,56 @@ package org.apache.cassandra.db;
 
 import java.util.Iterator;
 
-import com.google.common.base.Objects;
-import com.google.common.collect.Iterators;
-
 import org.apache.cassandra.cache.IMeasurableMemory;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.db.rows.RowStats;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A combination of a top-level (partition) tombstone and range tombstones 
describing the deletions
  * within a partition.
+ * <p>
+ * Note that in practice {@link MutableDeletionInfo} is the only concrete 
implementation of this, however
+ * different parts of the code will return either {@code DeletionInfo} or 
{@code MutableDeletionInfo} based
+ * on whether it can/should be mutated or not.
+ * <p>
+ * <b>Warning:</b> do not ever cast a {@code DeletionInfo} into a {@code 
MutableDeletionInfo} to mutate it!!!
+ * TODO: it would be safer to have 2 actual implementation of DeletionInfo, 
one mutable and one that isn't (I'm
+ * just lazy right this minute).
  */
-public class DeletionInfo implements IMeasurableMemory
+public interface DeletionInfo extends IMeasurableMemory
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
DeletionInfo(0, 0));
-
-    /**
-     * This represents a deletion of the entire partition. We can't represent 
this within the RangeTombstoneList, so it's
-     * kept separately. This also slightly optimizes the common case of a full 
partition deletion.
-     */
-    private DeletionTime partitionDeletion;
-
-    /**
-     * A list of range tombstones within the partition.  This is left as null 
if there are no range tombstones
-     * (to save an allocation (since it's a common case).
-     */
-    private RangeTombstoneList ranges;
-
-    /**
-     * Creates a DeletionInfo with only a top-level (row) tombstone.
-     * @param markedForDeleteAt the time after which the entire row should be 
considered deleted
-     * @param localDeletionTime what time the deletion write was applied 
locally (for purposes of
-     *                          purging the tombstone after gc_grace_seconds).
-     */
-    public DeletionInfo(long markedForDeleteAt, int localDeletionTime)
-    {
-        // Pre-1.1 node may return MIN_VALUE for non-deleted container, but 
the new default is MAX_VALUE
-        // (see CASSANDRA-3872)
-        this(new SimpleDeletionTime(markedForDeleteAt, localDeletionTime == 
Integer.MIN_VALUE ? Integer.MAX_VALUE : localDeletionTime));
-    }
-
-    public DeletionInfo(DeletionTime partitionDeletion)
-    {
-        this(partitionDeletion, null);
-    }
-
-    public DeletionInfo(ClusteringComparator comparator, Slice slice, long 
markedForDeleteAt, int localDeletionTime)
-    {
-        this(DeletionTime.LIVE, new RangeTombstoneList(comparator, 1));
-        ranges.add(slice.start(), slice.end(), markedForDeleteAt, 
localDeletionTime);
-    }
-
-    public DeletionInfo(DeletionTime partitionDeletion, RangeTombstoneList 
ranges)
-    {
-        this.partitionDeletion = partitionDeletion.takeAlias();
-        this.ranges = ranges;
-    }
-
-    /**
-     * Returns a new DeletionInfo that has no top-level tombstone or any range 
tombstones.
-     */
-    public static DeletionInfo live()
-    {
-        return new DeletionInfo(DeletionTime.LIVE);
-    }
-
-    public DeletionInfo copy()
-    {
-        return new DeletionInfo(partitionDeletion, ranges == null ? null : 
ranges.copy());
-    }
-
-    public DeletionInfo copy(AbstractAllocator allocator)
-    {
-        RangeTombstoneList rangesCopy = null;
-        if (ranges != null)
-             rangesCopy = ranges.copy(allocator);
-
-        return new DeletionInfo(partitionDeletion, rangesCopy);
-    }
+    // Note that while MutableDeletionInfo.live() is mutable, we expose it 
here as a non-mutable DeletionInfo so sharing is fine.
+    public static final DeletionInfo LIVE = MutableDeletionInfo.live();
 
     /**
      * Returns whether this DeletionInfo is live, that is deletes no columns.
      */
-    public boolean isLive()
-    {
-        return partitionDeletion.isLive() && (ranges == null || 
ranges.isEmpty());
-    }
+    public boolean isLive();
 
-    /**
-     * Return whether a given cell is deleted by this deletion info.
-     *
-     * @param clustering the clustering for the cell to check.
-     * @param cell the cell to check.
-     * @return true if the cell is deleted, false otherwise
-     */
-    private boolean isDeleted(Clustering clustering, Cell cell)
-    {
-        // If we're live, don't consider anything deleted, even if the cell 
ends up having as timestamp Long.MIN_VALUE
-        // (which shouldn't happen in practice, but it would invalid to 
consider it deleted if it does).
-        if (isLive())
-            return false;
+    public DeletionTime getPartitionDeletion();
 
-        if (cell.livenessInfo().timestamp() <= 
partitionDeletion.markedForDeleteAt())
-            return true;
-
-        // No matter what the counter cell's timestamp is, a tombstone always 
takes precedence. See CASSANDRA-7346.
-        if (!partitionDeletion.isLive() && cell.isCounterCell())
-            return true;
-
-        return ranges != null && ranges.isDeleted(clustering, cell);
-    }
-
-    /**
-     * Potentially replaces the top-level tombstone with another, keeping 
whichever has the higher markedForDeleteAt
-     * timestamp.
-     * @param newInfo
-     */
-    public void add(DeletionTime newInfo)
-    {
-        if (newInfo.supersedes(partitionDeletion))
-            partitionDeletion = newInfo;
-    }
-
-    public void add(RangeTombstone tombstone, ClusteringComparator comparator)
-    {
-        if (ranges == null)
-            ranges = new RangeTombstoneList(comparator, 1);
-
-        ranges.add(tombstone);
-    }
-
-    /**
-     * Combines another DeletionInfo with this one and returns the result.  
Whichever top-level tombstone
-     * has the higher markedForDeleteAt timestamp will be kept, along with its 
localDeletionTime.  The
-     * range tombstones will be combined.
-     *
-     * @return this object.
-     */
-    public DeletionInfo add(DeletionInfo newInfo)
-    {
-        add(newInfo.partitionDeletion);
-
-        if (ranges == null)
-            ranges = newInfo.ranges == null ? null : newInfo.ranges.copy();
-        else if (newInfo.ranges != null)
-            ranges.addAll(newInfo.ranges);
+    // Use sparingly, not the most efficient thing
+    public Iterator<RangeTombstone> rangeIterator(boolean reversed);
 
-        return this;
-    }
+    public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean 
reversed);
 
-    public DeletionTime getPartitionDeletion()
-    {
-        return partitionDeletion;
-    }
+    public RangeTombstone rangeCovering(Clustering name);
 
-    // Use sparingly, not the most efficient thing
-    public Iterator<RangeTombstone> rangeIterator(boolean reversed)
-    {
-        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : 
ranges.iterator(reversed);
-    }
+    public void collectStats(RowStats.Collector collector);
 
-    public Iterator<RangeTombstone> rangeIterator(Slice slice, boolean 
reversed)
-    {
-        return ranges == null ? Iterators.<RangeTombstone>emptyIterator() : 
ranges.iterator(slice, reversed);
-    }
+    public int dataSize();
 
-    public RangeTombstone rangeCovering(Clustering name)
-    {
-        return ranges == null ? null : ranges.search(name);
-    }
+    public boolean hasRanges();
 
-    public int dataSize()
-    {
-        int size = TypeSizes.sizeof(partitionDeletion.markedForDeleteAt());
-        return size + (ranges == null ? 0 : ranges.dataSize());
-    }
+    public int rangeCount();
 
-    public boolean hasRanges()
-    {
-        return ranges != null && !ranges.isEmpty();
-    }
-
-    public int rangeCount()
-    {
-        return hasRanges() ? ranges.size() : 0;
-    }
+    public long maxTimestamp();
 
     /**
      * Whether this deletion info may modify the provided one if added to it.
      */
-    public boolean mayModify(DeletionInfo delInfo)
-    {
-        return partitionDeletion.compareTo(delInfo.partitionDeletion) > 0 || 
hasRanges();
-    }
-
-    @Override
-    public String toString()
-    {
-        if (ranges == null || ranges.isEmpty())
-            return String.format("{%s}", partitionDeletion);
-        else
-            return String.format("{%s, ranges=%s}", partitionDeletion, 
rangesAsString());
-    }
-
-    private String rangesAsString()
-    {
-        assert !ranges.isEmpty();
-        StringBuilder sb = new StringBuilder();
-        ClusteringComparator cc = ranges.comparator();
-        Iterator<RangeTombstone> iter = rangeIterator(false);
-        while (iter.hasNext())
-        {
-            RangeTombstone i = iter.next();
-            sb.append(i.deletedSlice().toString(cc));
-            sb.append("@");
-            sb.append(i.deletionTime());
-        }
-        return sb.toString();
-    }
-
-    // Updates all the timestamp of the deletion contained in this 
DeletionInfo to be {@code timestamp}.
-    public DeletionInfo updateAllTimestamp(long timestamp)
-    {
-        if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE)
-            partitionDeletion = new SimpleDeletionTime(timestamp, 
partitionDeletion.localDeletionTime());
-
-        if (ranges != null)
-            ranges.updateAllTimestamp(timestamp);
-        return this;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if(!(o instanceof DeletionInfo))
-            return false;
-        DeletionInfo that = (DeletionInfo)o;
-        return partitionDeletion.equals(that.partitionDeletion) && 
Objects.equal(ranges, that.ranges);
-    }
-
-    @Override
-    public final int hashCode()
-    {
-        return Objects.hashCode(partitionDeletion, ranges);
-    }
+    public boolean mayModify(DeletionInfo delInfo);
 
-    @Override
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == 
null ? 0 : ranges.unsharedHeapSize());
-    }
+    public MutableDeletionInfo mutableCopy();
+    public DeletionInfo copy(AbstractAllocator allocator);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionPurger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionPurger.java 
b/src/java/org/apache/cassandra/db/DeletionPurger.java
new file mode 100644
index 0000000..d368b69
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletionPurger.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public interface DeletionPurger
+{
+    public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true;
+
+    public boolean shouldPurge(long timestamp, int localDeletionTime);
+
+    public default boolean shouldPurge(DeletionTime dt)
+    {
+        return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), 
dt.localDeletionTime());
+    }
+
+    public default boolean shouldPurge(LivenessInfo liveness, int nowInSec)
+    {
+        return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), 
liveness.localExpirationTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java 
b/src/java/org/apache/cassandra/db/DeletionTime.java
index 67842a7..3e9ca80 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.security.MessageDigest;
 
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -34,29 +34,44 @@ import org.apache.cassandra.utils.ObjectSizes;
 /**
  * Information on deletion of a storage engine object.
  */
-public abstract class DeletionTime implements Comparable<DeletionTime>, 
IMeasurableMemory, Aliasable<DeletionTime>
+public class DeletionTime implements Comparable<DeletionTime>, 
IMeasurableMemory
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
SimpleDeletionTime(0, 0));
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
DeletionTime(0, 0));
 
     /**
      * A special DeletionTime that signifies that there is no top-level (row) 
tombstone.
      */
-    public static final DeletionTime LIVE = new 
SimpleDeletionTime(Long.MIN_VALUE, Integer.MAX_VALUE);
+    public static final DeletionTime LIVE = new DeletionTime(Long.MIN_VALUE, 
Integer.MAX_VALUE);
 
     public static final Serializer serializer = new Serializer();
 
+    private final long markedForDeleteAt;
+    private final int localDeletionTime;
+
+    public DeletionTime(long markedForDeleteAt, int localDeletionTime)
+    {
+        this.markedForDeleteAt = markedForDeleteAt;
+        this.localDeletionTime = localDeletionTime;
+    }
+
     /**
      * A timestamp (typically in microseconds since the unix epoch, although 
this is not enforced) after which
      * data should be considered deleted. If set to Long.MIN_VALUE, this 
implies that the data has not been marked
      * for deletion at all.
      */
-    public abstract long markedForDeleteAt();
+    public long markedForDeleteAt()
+    {
+        return markedForDeleteAt;
+    }
 
     /**
      * The local server timestamp, in seconds since the unix epoch, at which 
this tombstone was created. This is
      * only used for purposes of purging the tombstone after gc_grace_seconds 
have elapsed.
      */
-    public abstract int localDeletionTime();
+    public int localDeletionTime()
+    {
+        return localDeletionTime;
+    }
 
     /**
      * Returns whether this DeletionTime is live, that is deletes no columns.
@@ -112,14 +127,14 @@ public abstract class DeletionTime implements 
Comparable<DeletionTime>, IMeasura
         return markedForDeleteAt() > dt.markedForDeleteAt() || 
(markedForDeleteAt() == dt.markedForDeleteAt() && localDeletionTime() > 
dt.localDeletionTime());
     }
 
-    public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore)
+    public boolean deletes(LivenessInfo info)
     {
-        return markedForDeleteAt() < maxPurgeableTimestamp && 
localDeletionTime() < gcBefore;
+        return deletes(info.timestamp());
     }
 
-    public boolean deletes(LivenessInfo info)
+    public boolean deletes(Cell cell)
     {
-        return deletes(info.timestamp());
+        return deletes(cell.timestamp());
     }
 
     public boolean deletes(long timestamp)
@@ -151,10 +166,10 @@ public abstract class DeletionTime implements 
Comparable<DeletionTime>, IMeasura
             long mfda = in.readLong();
             return mfda == Long.MIN_VALUE && ldt == Integer.MAX_VALUE
                  ? LIVE
-                 : new SimpleDeletionTime(mfda, ldt);
+                 : new DeletionTime(mfda, ldt);
         }
 
-        public void skip(DataInput in) throws IOException
+        public void skip(DataInputPlus in) throws IOException
         {
             FileUtils.skipBytesFully(in, 4 + 8);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/DeletionTimeArray.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTimeArray.java 
b/src/java/org/apache/cassandra/db/DeletionTimeArray.java
deleted file mode 100644
index 77eb953..0000000
--- a/src/java/org/apache/cassandra/db/DeletionTimeArray.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.Arrays;
-
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Utility class to store an array of deletion times a bit efficiently.
- */
-public class DeletionTimeArray
-{
-    private long[] markedForDeleteAts;
-    private int[] delTimes;
-
-    public DeletionTimeArray(int initialCapacity)
-    {
-        this.markedForDeleteAts = new long[initialCapacity];
-        this.delTimes = new int[initialCapacity];
-        clear();
-    }
-
-    public void clear(int i)
-    {
-        markedForDeleteAts[i] = Long.MIN_VALUE;
-        delTimes[i] = Integer.MAX_VALUE;
-    }
-
-    public void set(int i, DeletionTime dt)
-    {
-        this.markedForDeleteAts[i] = dt.markedForDeleteAt();
-        this.delTimes[i] = dt.localDeletionTime();
-    }
-
-    public int size()
-    {
-        return markedForDeleteAts.length;
-    }
-
-    public void resize(int newSize)
-    {
-        int prevSize = size();
-
-        markedForDeleteAts = Arrays.copyOf(markedForDeleteAts, newSize);
-        delTimes = Arrays.copyOf(delTimes, newSize);
-
-        Arrays.fill(markedForDeleteAts, prevSize, newSize, Long.MIN_VALUE);
-        Arrays.fill(delTimes, prevSize, newSize, Integer.MAX_VALUE);
-    }
-
-    public boolean supersedes(int i, DeletionTime dt)
-    {
-        return markedForDeleteAts[i] > dt.markedForDeleteAt();
-    }
-
-    public boolean supersedes(int i, int j)
-    {
-        return markedForDeleteAts[i] > markedForDeleteAts[j];
-    }
-
-    public void swap(int i, int j)
-    {
-        long m = markedForDeleteAts[j];
-        int l = delTimes[j];
-
-        move(i, j);
-
-        markedForDeleteAts[i] = m;
-        delTimes[i] = l;
-    }
-
-    public void move(int i, int j)
-    {
-        markedForDeleteAts[j] = markedForDeleteAts[i];
-        delTimes[j] = delTimes[i];
-    }
-
-    public boolean isLive(int i)
-    {
-        return markedForDeleteAts[i] > Long.MIN_VALUE;
-    }
-
-    public void clear()
-    {
-        Arrays.fill(markedForDeleteAts, Long.MIN_VALUE);
-        Arrays.fill(delTimes, Integer.MAX_VALUE);
-    }
-
-    public int dataSize()
-    {
-        return 12 * markedForDeleteAts.length;
-    }
-
-    public long unsharedHeapSize()
-    {
-        return ObjectSizes.sizeOfArray(markedForDeleteAts)
-             + ObjectSizes.sizeOfArray(delTimes);
-    }
-
-    public void copy(DeletionTimeArray other)
-    {
-        assert size() == other.size();
-        for (int i = 0; i < size(); i++)
-        {
-            markedForDeleteAts[i] = other.markedForDeleteAts[i];
-            delTimes[i] = other.delTimes[i];
-        }
-    }
-
-    public static class Cursor extends DeletionTime
-    {
-        private DeletionTimeArray array;
-        private int i;
-
-        public Cursor setTo(DeletionTimeArray array, int i)
-        {
-            this.array = array;
-            this.i = i;
-            return this;
-        }
-
-        public long markedForDeleteAt()
-        {
-            return array.markedForDeleteAts[i];
-        }
-
-        public int localDeletionTime()
-        {
-            return array.delTimes[i];
-        }
-
-        public DeletionTime takeAlias()
-        {
-            return new SimpleDeletionTime(markedForDeleteAt(), 
localDeletionTime());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java 
b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 38113c8..4501f3c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -132,19 +132,13 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
         UUID hintId = UUIDGen.getTimeUUID();
         // serialize the hint with id and version as a composite column name
 
-        PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints,
-                                                  
StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId)),
-                                                  
PartitionColumns.of(hintColumn),
-                                                  1);
-
-        Row.Writer writer = upd.writer();
-        Rows.writeClustering(SystemKeyspace.Hints.comparator.make(hintId, 
MessagingService.current_version), writer);
+        DecoratedKey key = 
StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(targetId));
 
+        Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, 
MessagingService.current_version);
         ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, 
Mutation.serializer, MessagingService.current_version));
-        writer.writeCell(hintColumn, false, value, 
SimpleLivenessInfo.forUpdate(now, ttl, FBUtilities.nowInSeconds(), 
SystemKeyspace.Hints), null);
-        writer.endOfRow();
+        Cell cell = BufferCell.expiring(hintColumn, now, ttl, 
FBUtilities.nowInSeconds(), value);
 
-        return new Mutation(upd);
+        return new 
Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, 
ArrayBackedRow.singleCellRow(clustering, cell)));
     }
 
     /*
@@ -187,13 +181,8 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
     private static void deleteHint(ByteBuffer tokenBytes, Clustering 
clustering, long timestamp)
     {
         DecoratedKey dk =  
StorageService.getPartitioner().decorateKey(tokenBytes);
-
-        PartitionUpdate upd = new PartitionUpdate(SystemKeyspace.Hints, dk, 
PartitionColumns.of(hintColumn), 1);
-
-        Row.Writer writer = upd.writer();
-        Rows.writeClustering(clustering, writer);
-        Cells.writeTombstone(writer, hintColumn, timestamp, 
FBUtilities.nowInSeconds());
-
+        Cell cell = BufferCell.tombstone(hintColumn, timestamp, 
FBUtilities.nowInSeconds());
+        PartitionUpdate upd = 
PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, dk, 
ArrayBackedRow.singleCellRow(clustering, cell));
         new Mutation(upd).applyUnsafe(); // don't bother with commitlog since 
we're going to flush as soon as we're done with delivery
     }
 
@@ -420,7 +409,7 @@ public class HintedHandOffManager implements 
HintedHandOffManagerMBean
                 int version = 
Int32Type.instance.compose(hint.clustering().get(1));
                 Cell cell = hint.getCell(hintColumn);
 
-                final long timestamp = cell.livenessInfo().timestamp();
+                final long timestamp = cell.timestamp();
                 DataInputPlus in = new NIODataInputStream(cell.value(), true);
                 Mutation mutation;
                 try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java 
b/src/java/org/apache/cassandra/db/LegacyLayout.java
index f063256..8242ab7 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -31,15 +31,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.*;
-import org.apache.hadoop.io.serializer.Serialization;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -103,7 +102,7 @@ public abstract class LegacyLayout
         if (metadata.isSuper())
         {
             assert superColumnName != null;
-            return decodeForSuperColumn(metadata, new 
SimpleClustering(superColumnName), cellname);
+            return decodeForSuperColumn(metadata, new 
Clustering(superColumnName), cellname);
         }
 
         assert superColumnName == null;
@@ -152,7 +151,7 @@ public abstract class LegacyLayout
         {
             // If it's a compact table, it means the column is in fact a 
"dynamic" one
             if (metadata.isCompactTable())
-                return new LegacyCellName(new SimpleClustering(column), 
metadata.compactValueColumn(), null);
+                return new LegacyCellName(new Clustering(column), 
metadata.compactValueColumn(), null);
 
             throw new UnknownColumnException(metadata, column);
         }
@@ -242,7 +241,7 @@ public abstract class LegacyLayout
                                     ? CompositeType.splitName(value)
                                     : Collections.singletonList(value);
 
-        return new SimpleClustering(components.subList(0, Math.min(csize, 
components.size())).toArray(new ByteBuffer[csize]));
+        return new Clustering(components.subList(0, Math.min(csize, 
components.size())).toArray(new ByteBuffer[csize]));
     }
 
     public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering 
clustering)
@@ -276,7 +275,7 @@ public abstract class LegacyLayout
                                                                 DeletionInfo 
delInfo,
                                                                 
Iterator<LegacyCell> cells)
     {
-        SerializationHelper helper = new SerializationHelper(0, 
SerializationHelper.Flag.LOCAL);
+        SerializationHelper helper = new SerializationHelper(metadata, 0, 
SerializationHelper.Flag.LOCAL);
         return toUnfilteredRowIterator(metadata, key, 
LegacyDeletionInfo.from(delInfo), cells, false, helper);
     }
 
@@ -320,22 +319,16 @@ public abstract class LegacyLayout
 
         Iterator<Row> rows = convertToRows(new CellGrouper(metadata, helper), 
iter, delInfo);
         Iterator<RangeTombstone> ranges = 
delInfo.deletionInfo.rangeIterator(reversed);
-        final Iterator<Unfiltered> atoms = new 
RowAndTombstoneMergeIterator(metadata.comparator, reversed)
-                                     .setTo(rows, ranges);
-
-        return new AbstractUnfilteredRowIterator(metadata,
-                                        key,
-                                        
delInfo.deletionInfo.getPartitionDeletion(),
-                                        metadata.partitionColumns(),
-                                        staticRow,
-                                        reversed,
-                                        RowStats.NO_STATS)
-        {
-            protected Unfiltered computeNext()
-            {
-                return atoms.hasNext() ? atoms.next() : endOfData();
-            }
-        };
+        return new RowAndDeletionMergeIterator(metadata,
+                                               key,
+                                               
delInfo.deletionInfo.getPartitionDeletion(),
+                                               ColumnFilter.all(metadata),
+                                               staticRow,
+                                               reversed,
+                                               RowStats.NO_STATS,
+                                               rows,
+                                               ranges,
+                                               true);
     }
 
     public static Row extractStaticColumns(CFMetaData metadata, DataInputPlus 
in, Columns statics) throws IOException
@@ -351,7 +344,7 @@ public abstract class LegacyLayout
         for (ColumnDefinition column : statics)
             columnsToFetch.add(column.name.bytes);
 
-        StaticRow.Builder builder = StaticRow.builder(statics, false, 
metadata.isCounter());
+        Row.Builder builder = ArrayBackedRow.unsortedBuilder(statics, 
FBUtilities.nowInSeconds());
 
         boolean foundOne = false;
         LegacyAtom atom;
@@ -364,7 +357,7 @@ public abstract class LegacyLayout
                     continue;
 
                 foundOne = true;
-                builder.writeCell(cell.name.column, cell.isCounter(), 
cell.value, livenessInfo(metadata, cell), null);
+                builder.addCell(new BufferCell(cell.name.column, 
cell.timestamp, cell.ttl, cell.localDeletionTime, cell.value, null));
             }
             else
             {
@@ -469,7 +462,7 @@ public abstract class LegacyLayout
     {
         return new AbstractIterator<LegacyCell>()
         {
-            private final Iterator<Cell> cells = row.iterator();
+            private final Iterator<Cell> cells = row.cells().iterator();
             // we don't have (and shouldn't have) row markers for compact 
tables.
             private boolean hasReturnedRowMarker = metadata.isCompactTable();
 
@@ -480,7 +473,7 @@ public abstract class LegacyLayout
                     hasReturnedRowMarker = true;
                     LegacyCellName cellName = new 
LegacyCellName(row.clustering(), null, null);
                     LivenessInfo info = row.primaryKeyLivenessInfo();
-                    return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localDeletionTime(), 
info.ttl());
+                    return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), 
info.ttl());
                 }
 
                 if (!cells.hasNext())
@@ -507,8 +500,7 @@ public abstract class LegacyLayout
         CellPath path = cell.path();
         assert path == null || path.size() == 1;
         LegacyCellName name = new LegacyCellName(clustering, cell.column(), 
path == null ? null : path.get(0));
-        LivenessInfo info = cell.livenessInfo();
-        return new LegacyCell(kind, name, cell.value(), info.timestamp(), 
info.localDeletionTime(), info.ttl());
+        return new LegacyCell(kind, name, cell.value(), cell.timestamp(), 
cell.localDeletionTime(), cell.ttl());
     }
 
     public static RowIterator toRowIterator(final CFMetaData metadata,
@@ -516,17 +508,10 @@ public abstract class LegacyLayout
                                             final Iterator<LegacyCell> cells,
                                             final int nowInSec)
     {
-        SerializationHelper helper = new SerializationHelper(0, 
SerializationHelper.Flag.LOCAL);
+        SerializationHelper helper = new SerializationHelper(metadata, 0, 
SerializationHelper.Flag.LOCAL);
         return UnfilteredRowIterators.filter(toUnfilteredRowIterator(metadata, 
key, LegacyDeletionInfo.live(), cells, false, helper), nowInSec);
     }
 
-    private static LivenessInfo livenessInfo(CFMetaData metadata, LegacyCell 
cell)
-    {
-        return cell.isTombstone()
-             ? SimpleLivenessInfo.forDeletion(cell.timestamp, 
cell.localDeletionTime)
-             : SimpleLivenessInfo.forUpdate(cell.timestamp, cell.ttl, 
cell.localDeletionTime, metadata);
-    }
-
     public static Comparator<LegacyCell> legacyCellComparator(CFMetaData 
metadata)
     {
         return legacyCellComparator(metadata, false);
@@ -662,7 +647,7 @@ public abstract class LegacyLayout
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
             if (flag == SerializationHelper.Flag.FROM_REMOTE || (flag == 
SerializationHelper.Flag.LOCAL && 
CounterContext.instance().shouldClearLocal(value)))
                 value = CounterContext.instance().clearAllLocal(value);
-            return new LegacyCell(LegacyCell.Kind.COUNTER, 
decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, 
LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL);
+            return new LegacyCell(LegacyCell.Kind.COUNTER, 
decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, 
Cell.NO_DELETION_TIME, Cell.NO_TTL);
         }
         else if ((mask & EXPIRATION_MASK) != 0)
         {
@@ -678,10 +663,10 @@ public abstract class LegacyLayout
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
             LegacyCellName name = decodeCellName(metadata, cellname, 
readAllAsDynamic);
             return (mask & COUNTER_UPDATE_MASK) != 0
-                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, 
CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, 
LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL)
+                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, 
CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, 
Cell.NO_DELETION_TIME, Cell.NO_TTL)
                 : ((mask & DELETION_MASK) == 0
-                        ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, 
ts, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL)
-                        : new LegacyCell(LegacyCell.Kind.DELETED, name, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), 
LivenessInfo.NO_TTL));
+                        ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, 
ts, Cell.NO_DELETION_TIME, Cell.NO_TTL)
+                        : new LegacyCell(LegacyCell.Kind.DELETED, name, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), 
Cell.NO_TTL));
         }
     }
 
@@ -741,10 +726,9 @@ public abstract class LegacyLayout
     public static class CellGrouper
     {
         public final CFMetaData metadata;
-        private final ReusableRow row;
         private final boolean isStatic;
         private final SerializationHelper helper;
-        private Row.Writer writer;
+        private Row.Builder builder;
         private Clustering clustering;
 
         private LegacyRangeTombstone rowDeletion;
@@ -760,10 +744,7 @@ public abstract class LegacyLayout
             this.metadata = metadata;
             this.isStatic = isStatic;
             this.helper = helper;
-            this.row = isStatic ? null : new 
ReusableRow(metadata.clusteringColumns().size(), 
metadata.partitionColumns().regulars, false, metadata.isCounter());
-
-            if (isStatic)
-                this.writer = 
StaticRow.builder(metadata.partitionColumns().statics, false, 
metadata.isCounter());
+            this.builder = ArrayBackedRow.sortedBuilder(isStatic ? 
metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
         }
 
         public static CellGrouper staticGrouper(CFMetaData metadata, 
SerializationHelper helper)
@@ -776,9 +757,6 @@ public abstract class LegacyLayout
             this.clustering = null;
             this.rowDeletion = null;
             this.collectionDeletion = null;
-
-            if (!isStatic)
-                this.writer = row.writer();
         }
 
         public boolean addAtom(LegacyAtom atom)
@@ -797,8 +775,8 @@ public abstract class LegacyLayout
             }
             else if (clustering == null)
             {
-                clustering = cell.name.clustering.takeAlias();
-                clustering.writeTo(writer);
+                clustering = cell.name.clustering;
+                builder.newRow(clustering);
             }
             else if (!clustering.equals(cell.name.clustering))
             {
@@ -809,14 +787,12 @@ public abstract class LegacyLayout
             if (rowDeletion != null && 
rowDeletion.deletionTime.deletes(cell.timestamp))
                 return true;
 
-            LivenessInfo info = livenessInfo(metadata, cell);
-
             ColumnDefinition column = cell.name.column;
             if (column == null)
             {
                 // It's the row marker
                 assert !cell.value.hasRemaining();
-                helper.writePartitionKeyLivenessInfo(writer, info.timestamp(), 
info.ttl(), info.localDeletionTime());
+                
builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(cell.timestamp, cell.ttl, 
cell.localDeletionTime));
             }
             else
             {
@@ -833,11 +809,15 @@ public abstract class LegacyLayout
                         // practice and 2) is only used during upgrade, it's 
probably worth keeping things simple.
                         helper.startOfComplexColumn(column);
                         path = cell.name.collectionElement == null ? null : 
CellPath.create(cell.name.collectionElement);
+                        if (!helper.includes(path))
+                            return true;
                     }
-                    helper.writeCell(writer, column, cell.isCounter(), 
cell.value, info.timestamp(), info.localDeletionTime(), info.ttl(), path);
+                    Cell c = new BufferCell(column, cell.timestamp, cell.ttl, 
cell.localDeletionTime, cell.value, path);
+                    if (!helper.isDropped(c, column.isComplex()))
+                        builder.addCell(c);
                     if (column.isComplex())
                     {
-                        helper.endOfComplexColumn(column);
+                        helper.endOfComplexColumn();
                     }
                 }
             }
@@ -852,9 +832,9 @@ public abstract class LegacyLayout
                 if (clustering != null)
                     return false;
 
-                clustering = 
tombstone.start.getAsClustering(metadata).takeAlias();
-                clustering.writeTo(writer);
-                writer.writeRowDeletion(tombstone.deletionTime);
+                clustering = tombstone.start.getAsClustering(metadata);
+                builder.newRow(clustering);
+                builder.addRowDeletion(tombstone.deletionTime);
                 rowDeletion = tombstone;
                 return true;
             }
@@ -863,15 +843,15 @@ public abstract class LegacyLayout
             {
                 if (clustering == null)
                 {
-                    clustering = 
tombstone.start.getAsClustering(metadata).takeAlias();
-                    clustering.writeTo(writer);
+                    clustering = tombstone.start.getAsClustering(metadata);
+                    builder.newRow(clustering);
                 }
                 else if 
(!clustering.equals(tombstone.start.getAsClustering(metadata)))
                 {
                     return false;
                 }
 
-                writer.writeComplexDeletion(tombstone.start.collectionName, 
tombstone.deletionTime);
+                builder.addComplexDeletion(tombstone.start.collectionName, 
tombstone.deletionTime);
                 if (rowDeletion == null || 
tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
                     collectionDeletion = tombstone;
                 return true;
@@ -881,8 +861,7 @@ public abstract class LegacyLayout
 
         public Row getRow()
         {
-            writer.endOfRow();
-            return isStatic ? ((StaticRow.Builder)writer).build() : row;
+            return builder.build();
         }
     }
 
@@ -947,7 +926,7 @@ public abstract class LegacyLayout
             ByteBuffer[] values = new ByteBuffer[bound.size()];
             for (int i = 0; i < bound.size(); i++)
                 values[i] = bound.get(i);
-            return new SimpleClustering(values);
+            return new Clustering(values);
         }
 
         @Override
@@ -1005,13 +984,13 @@ public abstract class LegacyLayout
         public static LegacyCell regular(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, ByteBuffer value, long timestamp)
         throws UnknownColumnException
         {
-            return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, 
superColumnName, name), value, timestamp, LivenessInfo.NO_DELETION_TIME, 
LivenessInfo.NO_TTL);
+            return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, 
superColumnName, name), value, timestamp, Cell.NO_DELETION_TIME, Cell.NO_TTL);
         }
 
         public static LegacyCell expiring(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, ByteBuffer value, long timestamp, int ttl, 
int nowInSec)
         throws UnknownColumnException
         {
-            return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, 
superColumnName, name), value, timestamp, nowInSec, ttl);
+            return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, 
superColumnName, name), value, timestamp, nowInSec + ttl, ttl);
         }
 
         public static LegacyCell tombstone(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, long timestamp, int nowInSec)
@@ -1030,7 +1009,7 @@ public abstract class LegacyLayout
 
         public static LegacyCell counter(LegacyCellName name, ByteBuffer value)
         {
-            return new LegacyCell(Kind.COUNTER, name, value, 
FBUtilities.timestampMicros(), LivenessInfo.NO_DELETION_TIME, 
LivenessInfo.NO_TTL);
+            return new LegacyCell(Kind.COUNTER, name, value, 
FBUtilities.timestampMicros(), Cell.NO_DELETION_TIME, Cell.NO_TTL);
         }
 
         public ClusteringPrefix clustering()
@@ -1205,7 +1184,7 @@ public abstract class LegacyLayout
 
         public static LegacyDeletionInfo live()
         {
-            return from(DeletionInfo.live());
+            return from(DeletionInfo.LIVE);
         }
 
         public Iterator<LegacyRangeTombstone> inRowRangeTombstones()
@@ -1228,7 +1207,7 @@ public abstract class LegacyLayout
 
                 int rangeCount = in.readInt();
                 if (rangeCount == 0)
-                    return from(new DeletionInfo(topLevel));
+                    return from(new MutableDeletionInfo(topLevel));
 
                 RangeTombstoneList ranges = new 
RangeTombstoneList(metadata.comparator, rangeCount);
                 List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
@@ -1239,13 +1218,13 @@ public abstract class LegacyLayout
                     int delTime =  in.readInt();
                     long markedAt = in.readLong();
 
-                    LegacyRangeTombstone tombstone = new 
LegacyRangeTombstone(start, end, new SimpleDeletionTime(markedAt, delTime));
+                    LegacyRangeTombstone tombstone = new 
LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime));
                     if (tombstone.isCollectionTombstone() || 
tombstone.isRowDeletion(metadata))
                         inRowTombsones.add(tombstone);
                     else
                         ranges.add(start.bound, end.bound, markedAt, delTime);
                 }
-                return new LegacyDeletionInfo(new DeletionInfo(topLevel, 
ranges), inRowTombsones);
+                return new LegacyDeletionInfo(new 
MutableDeletionInfo(topLevel, ranges), inRowTombsones);
             }
 
             public long serializedSize(CFMetaData metadata, LegacyDeletionInfo 
info, TypeSizes typeSizes, int version)

Reply via email to