http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 30923c5..4072f8d 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -17,32 +17,21 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.utils.SearchIterator;
 
 /**
- * Serialize/deserialize a single Unfiltered for the intra-node protocol.
+ * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
  *
- * The encode format for an unfiltered is <flags>(<row>|<marker>) where:
+ * The encoded format for an unfiltered is <flags>(<row>|<marker>) where:
  *
- *   <flags> is a byte whose bits are flags. The rightmost 1st bit is only
- *       set to indicate the end of the partition. The 2nd bit indicates
- *       whether the reminder is a range tombstone marker (otherwise it's a 
row).
- *       If it's a row then the 3rd bit indicates if it's static, the 4th bit
- *       indicates the presence of a row timestamp, the 5th the presence of a 
row
- *       ttl, the 6th the presence of row deletion and the 7th indicates the
- *       presence of complex deletion times.
+ *   <flags> is a byte whose bits are flags used by the rest of the 
serialization. Each
+ *       flag is defined/explained below as the "Unfiltered flags" constants.
  *   <row> is 
<clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where
  *       <clustering> is the row clustering as serialized by
  *       {@code Clustering.serializer}. Note that static row are an exception 
and
@@ -50,7 +39,7 @@ import org.apache.cassandra.utils.SearchIterator;
  *       whose presence is determined by the flags. <sci> is the simple 
columns of the row and <ccj> the
  *       complex ones.  There is actually 2 slightly different possible layout 
for those
  *       cell: a dense one and a sparse one. Which one is used depends on the 
serialization
- *       header and more precisely of {@link 
SerializationHeader.useSparseColumnLayout()}:
+ *       header and more precisely of {@link 
SerializationHeader#useSparseColumnLayout(boolean)}:
  *         1) in the dense layout, there will be as many <sci> and <ccj> as 
there is columns
  *            in the serialization header. *Each simple column <sci> will 
simply be a <cell>
  *            (which might have no value, see below), while each <ccj> will be
@@ -84,27 +73,18 @@ import org.apache.cassandra.utils.SearchIterator;
  */
 public class UnfilteredSerializer
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(UnfilteredSerializer.class);
-
     public static final UnfilteredSerializer serializer = new 
UnfilteredSerializer();
 
-    // Unfiltered flags
-    private final static int END_OF_PARTITION     = 0x01;
-    private final static int IS_MARKER            = 0x02;
-    // For rows
-    private final static int IS_STATIC            = 0x04;
-    private final static int HAS_TIMESTAMP        = 0x08;
-    private final static int HAS_TTL              = 0x10;
-    private final static int HAS_DELETION         = 0x20;
-    private final static int HAS_COMPLEX_DELETION = 0x40;
-
-    // Cell flags
-    private final static int PRESENCE_MASK     = 0x01;
-    private final static int DELETION_MASK     = 0x02;
-    private final static int EXPIRATION_MASK   = 0x04;
-    private final static int EMPTY_VALUE_MASK  = 0x08;
-    private final static int USE_ROW_TIMESTAMP = 0x10;
-    private final static int USE_ROW_TTL       = 0x20;
+    /*
+     * Unfiltered flags constants.
+     */
+    private final static int END_OF_PARTITION     = 0x01; // Signal the end of 
the partition. Nothing follows a <flags> field with that flag.
+    private final static int IS_MARKER            = 0x02; // Whether the 
encoded unfiltered is a marker or a row. All following markers applies only to 
rows.
+    private final static int IS_STATIC            = 0x04; // Whether the 
encoded row is a static.
+    private final static int HAS_TIMESTAMP        = 0x08; // Whether the 
encoded row has a timestamp (i.e. if 
row.partitionKeyLivenessInfo().hasTimestamp() == true).
+    private final static int HAS_TTL              = 0x10; // Whether the 
encoded row has some expiration info (i.e. if 
row.partitionKeyLivenessInfo().hasTTL() == true).
+    private final static int HAS_DELETION         = 0x20; // Whether the 
encoded row has some deletion info.
+    private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the 
encoded row has some complex deletion for at least one of its columns.
 
     public void serialize(Unfiltered unfiltered, SerializationHeader header, 
DataOutputPlus out, int version)
     throws IOException
@@ -131,9 +111,9 @@ public class UnfilteredSerializer
 
         if (isStatic)
             flags |= IS_STATIC;
-        if (pkLiveness.hasTimestamp())
+        if (!pkLiveness.isEmpty())
             flags |= HAS_TIMESTAMP;
-        if (pkLiveness.hasTTL())
+        if (pkLiveness.isExpiring())
             flags |= HAS_TTL;
         if (!deletion.isLive())
             flags |= HAS_DELETION;
@@ -149,7 +129,7 @@ public class UnfilteredSerializer
         if ((flags & HAS_TTL) != 0)
         {
             out.writeInt(header.encodeTTL(pkLiveness.ttl()));
-            
out.writeInt(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+            
out.writeInt(header.encodeDeletionTime(pkLiveness.localExpirationTime()));
         }
         if ((flags & HAS_DELETION) != 0)
             UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, 
out);
@@ -160,52 +140,49 @@ public class UnfilteredSerializer
         SearchIterator<ColumnDefinition, ColumnData> cells = 
row.searchIterator();
 
         for (int i = 0; i < simpleCount; i++)
-            writeSimpleColumn(i, cells.next(columns.getSimple(i)), header, 
out, pkLiveness, useSparse);
+            writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), 
pkLiveness, header, out, useSparse);
 
         for (int i = simpleCount; i < columns.columnCount(); i++)
-            writeComplexColumn(i, cells.next(columns.getComplex(i - 
simpleCount)), hasComplexDeletion, header, out, pkLiveness, useSparse);
+            writeComplexColumn(i, 
(ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), 
hasComplexDeletion, pkLiveness, header, out, useSparse);
 
         if (useSparse)
             out.writeShort(-1);
     }
 
-    private void writeSimpleColumn(int idx, ColumnData data, 
SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, 
boolean useSparse)
+    private void writeSimpleColumn(int idx, Cell cell, LivenessInfo 
rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse)
     throws IOException
     {
         if (useSparse)
         {
-            if (data == null)
+            if (cell == null)
                 return;
 
             out.writeShort(idx);
         }
-
-        writeCell(data == null ? null : data.cell(), header, out, rowLiveness);
+        Cell.serializer.serialize(cell, out, rowLiveness, header);
     }
 
-    private void writeComplexColumn(int idx, ColumnData data, boolean 
hasComplexDeletion, SerializationHeader header, DataOutputPlus out, 
LivenessInfo rowLiveness, boolean useSparse)
+    private void writeComplexColumn(int idx, ComplexColumnData data, boolean 
hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, 
DataOutputPlus out, boolean useSparse)
     throws IOException
     {
-        Iterator<Cell> cells = data == null ? null : data.cells();
-        DeletionTime deletion = data == null ? DeletionTime.LIVE : 
data.complexDeletion();
-
         if (useSparse)
         {
-            assert hasComplexDeletion || deletion.isLive();
-            if (cells == null && deletion.isLive())
+            if (data == null)
                 return;
 
             out.writeShort(idx);
         }
 
         if (hasComplexDeletion)
-            UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, 
out);
+            UnfilteredRowIteratorSerializer.writeDelTime(data == null ? 
DeletionTime.LIVE : data.complexDeletion(), header, out);
 
-        if (cells != null)
-            while (cells.hasNext())
-                writeCell(cells.next(), header, out, rowLiveness);
+        if (data != null)
+        {
+            for (Cell cell : data)
+                Cell.serializer.serialize(cell, out, rowLiveness, header);
+        }
 
-        writeCell(null, header, out, rowLiveness);
+        Cell.serializer.serialize(null, out, rowLiveness, header);
     }
 
     public void serialize(RangeTombstoneMarker marker, SerializationHeader 
header, DataOutputPlus out, int version)
@@ -245,12 +222,12 @@ public class UnfilteredSerializer
         if (!isStatic)
             size += Clustering.serializer.serializedSize(row.clustering(), 
version, header.clusteringTypes());
 
-        if (pkLiveness.hasTimestamp())
+        if (!pkLiveness.isEmpty())
             size += 
TypeSizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp()));
-        if (pkLiveness.hasTTL())
+        if (pkLiveness.isExpiring())
         {
             size += TypeSizes.sizeof(header.encodeTTL(pkLiveness.ttl()));
-            size += 
TypeSizes.sizeof(header.encodeDeletionTime(pkLiveness.localDeletionTime()));
+            size += 
TypeSizes.sizeof(header.encodeDeletionTime(pkLiveness.localExpirationTime()));
         }
         if (!deletion.isLive())
             size += 
UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header);
@@ -261,10 +238,10 @@ public class UnfilteredSerializer
         SearchIterator<ColumnDefinition, ColumnData> cells = 
row.searchIterator();
 
         for (int i = 0; i < simpleCount; i++)
-            size += sizeOfSimpleColumn(i, cells.next(columns.getSimple(i)), 
header, pkLiveness, useSparse);
+            size += sizeOfSimpleColumn(i, 
(Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse);
 
         for (int i = simpleCount; i < columns.columnCount(); i++)
-            size += sizeOfComplexColumn(i, cells.next(columns.getComplex(i - 
simpleCount)), hasComplexDeletion, header, pkLiveness, useSparse);
+            size += sizeOfComplexColumn(i, 
(ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), 
hasComplexDeletion, pkLiveness, header, useSparse);
 
         if (useSparse)
             size += TypeSizes.sizeof((short)-1);
@@ -272,41 +249,40 @@ public class UnfilteredSerializer
         return size;
     }
 
-    private long sizeOfSimpleColumn(int idx, ColumnData data, 
SerializationHeader header, LivenessInfo rowLiveness, boolean useSparse)
+    private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo 
rowLiveness, SerializationHeader header, boolean useSparse)
     {
         long size = 0;
         if (useSparse)
         {
-            if (data == null)
+            if (cell == null)
                 return size;
 
             size += TypeSizes.sizeof((short)idx);
         }
-        return size + sizeOfCell(data == null ? null : data.cell(), header, 
rowLiveness);
+        return size + Cell.serializer.serializedSize(cell, rowLiveness, 
header);
     }
 
-    private long sizeOfComplexColumn(int idx, ColumnData data, boolean 
hasComplexDeletion, SerializationHeader header, LivenessInfo rowLiveness, 
boolean useSparse)
+    private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean 
hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, 
boolean useSparse)
     {
         long size = 0;
-        Iterator<Cell> cells = data == null ? null : data.cells();
-        DeletionTime deletion = data == null ? DeletionTime.LIVE : 
data.complexDeletion();
         if (useSparse)
         {
-            assert hasComplexDeletion || deletion.isLive();
-            if (cells == null && deletion.isLive())
+            if (data == null)
                 return size;
 
             size += TypeSizes.sizeof((short)idx);
         }
 
         if (hasComplexDeletion)
-            size += 
UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header);
+            size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(data 
== null ? DeletionTime.LIVE : data.complexDeletion(), header);
 
-        if (cells != null)
-            while (cells.hasNext())
-                size += sizeOfCell(cells.next(), header, rowLiveness);
+        if (data != null)
+        {
+            for (Cell cell : data)
+                size += Cell.serializer.serializedSize(cell, rowLiveness, 
header);
+        }
 
-        return size + sizeOfCell(null, header, rowLiveness);
+        return size + Cell.serializer.serializedSize(null, rowLiveness, 
header);
     }
 
     public long serializedSize(RangeTombstoneMarker marker, 
SerializationHeader header, int version)
@@ -337,155 +313,157 @@ public class UnfilteredSerializer
         return 1;
     }
 
-    public Unfiltered.Kind deserialize(DataInput in,
-                                 SerializationHeader header,
-                                 SerializationHelper helper,
-                                 Row.Writer rowWriter,
-                                 RangeTombstoneMarker.Writer markerWriter)
+    public Unfiltered deserialize(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper, Row.Builder builder)
     throws IOException
     {
+        // It wouldn't be wrong per-se to use an unsorted builder, but it 
would be inefficient so make sure we don't do it by mistake
+        assert builder.isSorted();
+
         int flags = in.readUnsignedByte();
         if (isEndOfPartition(flags))
             return null;
 
         if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
-            RangeTombstone.Bound.Kind kind = 
RangeTombstone.Bound.serializer.deserialize(in, helper.version, 
header.clusteringTypes(), markerWriter);
-            deserializeMarkerBody(in, header, kind.isBoundary(), markerWriter);
-            return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
+            RangeTombstone.Bound bound = 
RangeTombstone.Bound.serializer.deserialize(in, helper.version, 
header.clusteringTypes());
+            return deserializeMarkerBody(in, header, bound);
         }
         else
         {
             assert !isStatic(flags); // deserializeStaticRow should be used 
for that.
-            Clustering.serializer.deserialize(in, helper.version, 
header.clusteringTypes(), rowWriter);
-            deserializeRowBody(in, header, helper, flags, rowWriter);
-            return Unfiltered.Kind.ROW;
+            builder.newRow(Clustering.serializer.deserialize(in, 
helper.version, header.clusteringTypes()));
+            return deserializeRowBody(in, header, helper, flags, builder);
         }
     }
 
-    public Row deserializeStaticRow(DataInput in, SerializationHeader header, 
SerializationHelper helper)
+    public Row deserializeStaticRow(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper)
     throws IOException
     {
         int flags = in.readUnsignedByte();
-        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW 
&& isStatic(flags);
-        StaticRow.Builder builder = 
StaticRow.builder(header.columns().statics, true, 
header.columns().statics.hasCounters());
-        deserializeRowBody(in, header, helper, flags, builder);
-        return builder.build();
-    }
-
-    public void skipStaticRow(DataInput in, SerializationHeader header, 
SerializationHelper helper) throws IOException
-    {
-        int flags = in.readUnsignedByte();
-        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW 
&& isStatic(flags) : "Flags is " + flags;
-        skipRowBody(in, header, helper, flags);
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW 
&& isStatic(flags) : flags;
+        Row.Builder builder = 
ArrayBackedRow.sortedBuilder(helper.fetchedStaticColumns(header));
+        builder.newRow(Clustering.STATIC_CLUSTERING);
+        return deserializeRowBody(in, header, helper, flags, builder);
     }
 
-    public void deserializeMarkerBody(DataInput in,
-                                      SerializationHeader header,
-                                      boolean isBoundary,
-                                      RangeTombstoneMarker.Writer writer)
+    public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, 
SerializationHeader header, RangeTombstone.Bound bound)
     throws IOException
     {
-        if (isBoundary)
-            
writer.writeBoundaryDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, 
header), UnfilteredRowIteratorSerializer.readDelTime(in, header));
-        else
-            
writer.writeBoundDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, 
header));
-        writer.endOfMarker();
-    }
-
-    public void skipMarkerBody(DataInput in, SerializationHeader header, 
boolean isBoundary) throws IOException
-    {
-        if (isBoundary)
-        {
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
-        }
+        if (bound.isBoundary())
+            return new RangeTombstoneBoundaryMarker(bound, 
UnfilteredRowIteratorSerializer.readDelTime(in, header), 
UnfilteredRowIteratorSerializer.readDelTime(in, header));
         else
-        {
-            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
-        }
+            return new RangeTombstoneBoundMarker(bound, 
UnfilteredRowIteratorSerializer.readDelTime(in, header));
     }
 
-    public void deserializeRowBody(DataInput in,
-                                   SerializationHeader header,
-                                   SerializationHelper helper,
-                                   int flags,
-                                   Row.Writer writer)
+    public Row deserializeRowBody(DataInputPlus in,
+                                  SerializationHeader header,
+                                  SerializationHelper helper,
+                                  int flags,
+                                  Row.Builder builder)
     throws IOException
     {
-        boolean isStatic = isStatic(flags);
-        boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
-        boolean hasTTL = (flags & HAS_TTL) != 0;
-        boolean hasDeletion = (flags & HAS_DELETION) != 0;
-        boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
-
-        long timestamp = hasTimestamp ? header.decodeTimestamp(in.readLong()) 
: LivenessInfo.NO_TIMESTAMP;
-        int ttl = hasTTL ? header.decodeTTL(in.readInt()) : 
LivenessInfo.NO_TTL;
-        int localDeletionTime = hasTTL ? 
header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME;
-        DeletionTime deletion = hasDeletion ? 
UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE;
+        try
+        {
+            boolean isStatic = isStatic(flags);
+            boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
+            boolean hasTTL = (flags & HAS_TTL) != 0;
+            boolean hasDeletion = (flags & HAS_DELETION) != 0;
+            boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0;
+
+            LivenessInfo rowLiveness = LivenessInfo.EMPTY;
+            if (hasTimestamp)
+            {
+                long timestamp = header.decodeTimestamp(in.readLong());
+                int ttl = hasTTL ? header.decodeTTL(in.readInt()) : 
LivenessInfo.NO_TTL;
+                int localDeletionTime = hasTTL ? 
header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_EXPIRATION_TIME;
+                rowLiveness = LivenessInfo.create(timestamp, ttl, 
localDeletionTime);
+            }
 
-        helper.writePartitionKeyLivenessInfo(writer, timestamp, ttl, 
localDeletionTime);
-        writer.writeRowDeletion(deletion);
+            builder.addPrimaryKeyLivenessInfo(rowLiveness);
+            builder.addRowDeletion(hasDeletion ? 
UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE);
 
-        Columns columns = header.columns(isStatic);
-        if (header.useSparseColumnLayout(isStatic))
-        {
-            int count = columns.columnCount();
-            int simpleCount = columns.simpleColumnCount();
-            int i;
-            while ((i = in.readShort()) >= 0)
+            Columns columns = header.columns(isStatic);
+            if (header.useSparseColumnLayout(isStatic))
             {
-                if (i > count)
-                    throw new IOException(String.format("Impossible column 
index %d, the header has only %d columns defined", i, count));
+                int count = columns.columnCount();
+                int simpleCount = columns.simpleColumnCount();
+                int i;
+                while ((i = in.readShort()) >= 0)
+                {
+                    if (i > count)
+                        throw new IOException(String.format("Impossible column 
index %d, the header has only %d columns defined", i, count));
+
+                    if (i < simpleCount)
+                        readSimpleColumn(columns.getSimple(i), in, header, 
helper, builder, rowLiveness);
+                    else
+                        readComplexColumn(columns.getComplex(i - simpleCount), 
in, header, helper, hasComplexDeletion, builder, rowLiveness);
+                }
+            }
+            else
+            {
+                for (int i = 0; i < columns.simpleColumnCount(); i++)
+                    readSimpleColumn(columns.getSimple(i), in, header, helper, 
builder, rowLiveness);
 
-                if (i < simpleCount)
-                    readSimpleColumn(columns.getSimple(i), in, header, helper, 
writer);
-                else
-                    readComplexColumn(columns.getComplex(i - simpleCount), in, 
header, helper, hasComplexDeletion, writer);
+                for (int i = 0; i < columns.complexColumnCount(); i++)
+                    readComplexColumn(columns.getComplex(i), in, header, 
helper, hasComplexDeletion, builder, rowLiveness);
             }
+
+                return builder.build();
         }
-        else
+        catch (RuntimeException | AssertionError e)
         {
-            for (int i = 0; i < columns.simpleColumnCount(); i++)
-                readSimpleColumn(columns.getSimple(i), in, header, helper, 
writer);
-
-            for (int i = 0; i < columns.complexColumnCount(); i++)
-                readComplexColumn(columns.getComplex(i), in, header, helper, 
hasComplexDeletion, writer);
+            // Corrupted data could be such that it triggers an assertion in 
the row Builder, or break one of its assumption.
+            // Of course, a bug in said builder could also trigger this, but 
it's impossible a priori to always make the distinction
+            // between a real bug and data corrupted in just the bad way. 
Besides, re-throwing as an IOException doesn't hide the
+            // exception, it just make we catch it properly and mark the 
sstable as corrupted.
+            throw new IOException("Error building row with data deserialized 
from " + in, e);
         }
-
-        writer.endOfRow();
     }
 
-    private void readSimpleColumn(ColumnDefinition column, DataInput in, 
SerializationHeader header, SerializationHelper helper, Row.Writer writer)
+    private void readSimpleColumn(ColumnDefinition column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, Row.Builder builder, 
LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
-            readCell(column, in, header, helper, writer);
+        {
+            Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, 
header, helper);
+            if (cell != null && !helper.isDropped(cell, false))
+                builder.addCell(cell);
+        }
         else
-            skipCell(column, in, header);
+        {
+            Cell.serializer.skip(in, column, header);
+        }
     }
 
-    private void readComplexColumn(ColumnDefinition column, DataInput in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion, Row.Writer writer)
+    private void readComplexColumn(ColumnDefinition column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
         {
             helper.startOfComplexColumn(column);
-
             if (hasComplexDeletion)
-                writer.writeComplexDeletion(column, 
UnfilteredRowIteratorSerializer.readDelTime(in, header));
+            {
+                DeletionTime complexDeletion = 
UnfilteredRowIteratorSerializer.readDelTime(in, header);
+                if (!helper.isDroppedComplexDeletion(complexDeletion))
+                    builder.addComplexDeletion(column, complexDeletion);
+            }
 
-            while (readCell(column, in, header, helper, writer));
+            Cell cell;
+            while ((cell = Cell.serializer.deserialize(in, rowLiveness, 
column, header, helper)) != null)
+            {
+                if (helper.includes(cell.path()) && !helper.isDropped(cell, 
true))
+                    builder.addCell(cell);
+            }
 
-            helper.endOfComplexColumn(column);
+            helper.endOfComplexColumn();
         }
         else
         {
-            skipComplexColumn(column, in, header, helper, hasComplexDeletion);
+            skipComplexColumn(in, column, header, hasComplexDeletion);
         }
     }
 
-    public void skipRowBody(DataInput in, SerializationHeader header, 
SerializationHelper helper, int flags) throws IOException
+    public void skipRowBody(DataInputPlus in, SerializationHeader header, int 
flags) throws IOException
     {
         boolean isStatic = isStatic(flags);
         boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0;
@@ -518,28 +496,48 @@ public class UnfilteredSerializer
                     throw new IOException(String.format("Impossible column 
index %d, the header has only %d columns defined", i, count));
 
                 if (i < simpleCount)
-                    skipCell(columns.getSimple(i), in, header);
+                    Cell.serializer.skip(in, columns.getSimple(i), header);
                 else
-                    skipComplexColumn(columns.getComplex(i - simpleCount), in, 
header, helper, hasComplexDeletion);
+                    skipComplexColumn(in, columns.getComplex(i - simpleCount), 
header, hasComplexDeletion);
             }
         }
         else
         {
             for (int i = 0; i < columns.simpleColumnCount(); i++)
-                skipCell(columns.getSimple(i), in, header);
+                Cell.serializer.skip(in, columns.getSimple(i), header);
 
             for (int i = 0; i < columns.complexColumnCount(); i++)
-                skipComplexColumn(columns.getComplex(i), in, header, helper, 
hasComplexDeletion);
+                skipComplexColumn(in, columns.getComplex(i), header, 
hasComplexDeletion);
+        }
+    }
+
+    public void skipStaticRow(DataInputPlus in, SerializationHeader header, 
SerializationHelper helper) throws IOException
+    {
+        int flags = in.readUnsignedByte();
+        assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW 
&& isStatic(flags) : "Flags is " + flags;
+        skipRowBody(in, header, flags);
+    }
+
+    public void skipMarkerBody(DataInputPlus in, SerializationHeader header, 
boolean isBoundary) throws IOException
+    {
+        if (isBoundary)
+        {
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
+        }
+        else
+        {
+            UnfilteredRowIteratorSerializer.skipDelTime(in, header);
         }
     }
 
-    private void skipComplexColumn(ColumnDefinition column, DataInput in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion)
+    private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header, boolean hasComplexDeletion)
     throws IOException
     {
         if (hasComplexDeletion)
             UnfilteredRowIteratorSerializer.skipDelTime(in, header);
 
-        while (skipCell(column, in, header));
+        while (Cell.serializer.skip(in, column, header));
     }
 
     public static boolean isEndOfPartition(int flags)
@@ -556,151 +554,4 @@ public class UnfilteredSerializer
     {
         return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0;
     }
-
-    private void writeCell(Cell cell, SerializationHeader header, 
DataOutputPlus out, LivenessInfo rowLiveness)
-    throws IOException
-    {
-        if (cell == null)
-        {
-            out.writeByte((byte)0);
-            return;
-        }
-
-        boolean hasValue = cell.value().hasRemaining();
-        boolean isDeleted = cell.isTombstone();
-        boolean isExpiring = cell.isExpiring();
-        boolean useRowTimestamp = rowLiveness.hasTimestamp() && 
cell.livenessInfo().timestamp() == rowLiveness.timestamp();
-        boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && 
cell.livenessInfo().ttl() == rowLiveness.ttl() && 
cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
-        int flags = PRESENCE_MASK;
-        if (!hasValue)
-            flags |= EMPTY_VALUE_MASK;
-
-        if (isDeleted)
-            flags |= DELETION_MASK;
-        else if (isExpiring)
-            flags |= EXPIRATION_MASK;
-
-        if (useRowTimestamp)
-            flags |= USE_ROW_TIMESTAMP;
-        if (useRowTTL)
-            flags |= USE_ROW_TTL;
-
-        out.writeByte((byte)flags);
-
-        if (hasValue)
-            header.getType(cell.column()).writeValue(cell.value(), out);
-
-        if (!useRowTimestamp)
-            
out.writeLong(header.encodeTimestamp(cell.livenessInfo().timestamp()));
-
-        if ((isDeleted || isExpiring) && !useRowTTL)
-            
out.writeInt(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
-        if (isExpiring && !useRowTTL)
-            out.writeInt(header.encodeTTL(cell.livenessInfo().ttl()));
-
-        if (cell.column().isComplex())
-            cell.column().cellPathSerializer().serialize(cell.path(), out);
-    }
-
-    private long sizeOfCell(Cell cell, SerializationHeader header, 
LivenessInfo rowLiveness)
-    {
-        long size = 1; // flags
-
-        if (cell == null)
-            return size;
-
-        boolean hasValue = cell.value().hasRemaining();
-        boolean isDeleted = cell.isTombstone();
-        boolean isExpiring = cell.isExpiring();
-        boolean useRowTimestamp = rowLiveness.hasTimestamp() && 
cell.livenessInfo().timestamp() == rowLiveness.timestamp();
-        boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && 
cell.livenessInfo().ttl() == rowLiveness.ttl() && 
cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime();
-
-        if (hasValue)
-            size += header.getType(cell.column()).writtenLength(cell.value());
-
-        if (!useRowTimestamp)
-            size += 
TypeSizes.sizeof(header.encodeTimestamp(cell.livenessInfo().timestamp()));
-
-        if ((isDeleted || isExpiring) && !useRowTTL)
-            size += 
TypeSizes.sizeof(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime()));
-        if (isExpiring && !useRowTTL)
-            size += 
TypeSizes.sizeof(header.encodeTTL(cell.livenessInfo().ttl()));
-
-        if (cell.column().isComplex())
-            size += 
cell.column().cellPathSerializer().serializedSize(cell.path());
-
-        return size;
-    }
-
-    private boolean readCell(ColumnDefinition column, DataInput in, 
SerializationHeader header, SerializationHelper helper, Row.Writer writer)
-    throws IOException
-    {
-        int flags = in.readUnsignedByte();
-        if ((flags & PRESENCE_MASK) == 0)
-            return false;
-
-        boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
-        boolean isDeleted = (flags & DELETION_MASK) != 0;
-        boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
-        boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
-        boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
-
-        ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        if (hasValue)
-        {
-            if (helper.canSkipValue(column))
-                header.getType(column).skipValue(in);
-            else
-                value = header.getType(column).readValue(in);
-        }
-
-        long timestamp = useRowTimestamp ? helper.getRowTimestamp() : 
header.decodeTimestamp(in.readLong());
-
-        int localDelTime = useRowTTL
-                         ? helper.getRowLocalDeletionTime()
-                         : (isDeleted || isExpiring ? 
header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME);
-
-        int ttl = useRowTTL
-                ? helper.getRowTTL()
-                : (isExpiring ? header.decodeTTL(in.readInt()) : 
LivenessInfo.NO_TTL);
-
-        CellPath path = column.isComplex()
-                      ? column.cellPathSerializer().deserialize(in)
-                      : null;
-
-        helper.writeCell(writer, column, false, value, timestamp, 
localDelTime, ttl, path);
-
-        return true;
-    }
-
-    private boolean skipCell(ColumnDefinition column, DataInput in, 
SerializationHeader header)
-    throws IOException
-    {
-        int flags = in.readUnsignedByte();
-        if ((flags & PRESENCE_MASK) == 0)
-            return false;
-
-        boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0;
-        boolean isDeleted = (flags & DELETION_MASK) != 0;
-        boolean isExpiring = (flags & EXPIRATION_MASK) != 0;
-        boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0;
-        boolean useRowTTL = (flags & USE_ROW_TTL) != 0;
-
-        if (hasValue)
-            header.getType(column).skipValue(in);
-
-        if (!useRowTimestamp)
-            in.readLong();
-
-        if (!useRowTTL && (isDeleted || isExpiring))
-            in.readInt();
-
-        if (!useRowTTL && isExpiring)
-            in.readInt();
-
-        if (column.isComplex())
-            column.cellPathSerializer().skip(in);
-
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/WrappingRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRow.java 
b/src/java/org/apache/cassandra/db/rows/WrappingRow.java
deleted file mode 100644
index 5a0cc78..0000000
--- a/src/java/org/apache/cassandra/db/rows/WrappingRow.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.SearchIterator;
-
-public abstract class WrappingRow extends AbstractRow
-{
-    protected Row wrapped;
-
-    private final ReusableIterator cellIterator = new ReusableIterator();
-    private final ReusableSearchIterator cellSearchIterator = new 
ReusableSearchIterator();
-
-    /**
-     * Apply some filtering/transformation on cells. This function
-     * can return {@code null} in which case the cell will be ignored.
-     */
-    protected abstract Cell filterCell(Cell cell);
-
-    protected DeletionTime filterDeletionTime(DeletionTime deletionTime)
-    {
-        return deletionTime;
-    }
-
-    protected ColumnData filterColumnData(ColumnData data)
-    {
-        if (data.column().isComplex())
-        {
-            Iterator<Cell> cells = cellIterator.setTo(data.cells());
-            DeletionTime dt = filterDeletionTime(data.complexDeletion());
-            return cells == null && dt.isLive()
-                 ? null
-                 : new ColumnData(data.column(), null, cells == null ? 
Collections.emptyIterator(): cells, dt);
-        }
-        else
-        {
-            Cell filtered = filterCell(data.cell());
-            return filtered == null ? null : new ColumnData(data.column(), 
filtered, null, null);
-        }
-    }
-
-    public WrappingRow setTo(Row row)
-    {
-        this.wrapped = row;
-        return this;
-    }
-
-    public Unfiltered.Kind kind()
-    {
-        return Unfiltered.Kind.ROW;
-    }
-
-    public Clustering clustering()
-    {
-        return wrapped.clustering();
-    }
-
-    public Columns columns()
-    {
-        return wrapped.columns();
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return wrapped.primaryKeyLivenessInfo();
-    }
-
-    public DeletionTime deletion()
-    {
-        return wrapped.deletion();
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        // Note that because cells can be filtered out/transformed through
-        // filterCell(), we can't rely on wrapped.hasComplexDeletion().
-        for (int i = 0; i < columns().complexColumnCount(); i++)
-            if (!getDeletion(columns().getComplex(i)).isLive())
-                return true;
-        return false;
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        Cell cell = wrapped.getCell(c);
-        return cell == null ? null : filterCell(cell);
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        Cell cell = wrapped.getCell(c, path);
-        return cell == null ? null : filterCell(cell);
-    }
-
-    public Iterator<Cell> getCells(ColumnDefinition c)
-    {
-        Iterator<Cell> cells = wrapped.getCells(c);
-        if (cells == null)
-            return null;
-
-        cellIterator.setTo(cells);
-        return cellIterator.hasNext() ? cellIterator : null;
-    }
-
-    public DeletionTime getDeletion(ColumnDefinition c)
-    {
-        return filterDeletionTime(wrapped.getDeletion(c));
-    }
-
-    public Iterator<Cell> iterator()
-    {
-        return cellIterator.setTo(wrapped.iterator());
-    }
-
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return cellSearchIterator.setTo(wrapped.searchIterator());
-    }
-
-    public Row takeAlias()
-    {
-        boolean isCounter = columns().hasCounters();
-        if (isStatic())
-        {
-            StaticRow.Builder builder = StaticRow.builder(columns(), true, 
isCounter);
-            copyTo(builder);
-            return builder.build();
-        }
-        else
-        {
-            ReusableRow copy = new ReusableRow(clustering().size(), columns(), 
true, isCounter);
-            copyTo(copy.writer());
-            return copy;
-        }
-    }
-
-    private class ReusableIterator extends UnmodifiableIterator<Cell>
-    {
-        private Iterator<Cell> iter;
-        private Cell next;
-
-        public ReusableIterator setTo(Iterator<Cell> iter)
-        {
-            this.iter = iter;
-            this.next = null;
-            return this;
-        }
-
-        public boolean hasNext()
-        {
-            while (next == null && iter.hasNext())
-                next = filterCell(iter.next());
-            return next != null;
-        }
-
-        public Cell next()
-        {
-            if (next == null && !hasNext())
-                throw new NoSuchElementException();
-
-            Cell result = next;
-            next = null;
-            return result;
-        }
-    };
-
-    private class ReusableSearchIterator implements 
SearchIterator<ColumnDefinition, ColumnData>
-    {
-        private SearchIterator<ColumnDefinition, ColumnData> iter;
-
-        public ReusableSearchIterator setTo(SearchIterator<ColumnDefinition, 
ColumnData> iter)
-        {
-            this.iter = iter;
-            return this;
-        }
-
-        public boolean hasNext()
-        {
-            return iter.hasNext();
-        }
-
-        public ColumnData next(ColumnDefinition column)
-        {
-            ColumnData data = iter.next(column);
-            if (data == null)
-                return null;
-
-            return filterColumnData(data);
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
index 680e502..ff3f82c 100644
--- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -17,16 +17,21 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.util.NoSuchElementException;
+
 import com.google.common.collect.UnmodifiableIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 
 /**
- * Abstract class to make writing atom iterators that wrap another iterator
+ * Abstract class to make writing unfiltered iterators that wrap another 
iterator
  * easier. By default, the wrapping iterator simply delegate every call to
- * the wrapped iterator so concrete implementations will override some of the
- * methods.
+ * the wrapped iterator so concrete implementations will have to override
+ * some of the methods.
+ * <p>
+ * Note that if most of what you want to do is modifying/filtering the returned
+ * {@code Unfiltered}, {@link AlteringUnfilteredRowIterator} can be a simpler 
option.
  */
 public abstract class WrappingUnfilteredRowIterator extends 
UnmodifiableIterator<Unfiltered>  implements UnfilteredRowIterator
 {
@@ -67,6 +72,11 @@ public abstract class WrappingUnfilteredRowIterator extends 
UnmodifiableIterator
         return wrapped.staticRow();
     }
 
+    public RowStats stats()
+    {
+        return wrapped.stats();
+    }
+
     public boolean hasNext()
     {
         return wrapped.hasNext();
@@ -77,11 +87,6 @@ public abstract class WrappingUnfilteredRowIterator extends 
UnmodifiableIterator
         return wrapped.next();
     }
 
-    public RowStats stats()
-    {
-        return wrapped.stats();
-    }
-
     public void close()
     {
         wrapped.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 124547a..43e214b 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -209,6 +209,7 @@ public class CQLSSTableWriter implements Closeable
         // Note that we asks indexes to not validate values (the last 'false' 
arg below) because that triggers a 'Keyspace.open'
         // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.cfm,
+                                                       insert.updatedColumns(),
                                                        options,
                                                        
insert.getTimestamp(now, options),
                                                        
insert.getTimeToLive(options),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 70ab99c..f1af85c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -52,7 +52,7 @@ public class SSTableIdentityIterator extends 
AbstractIterator<Unfiltered> implem
         try
         {
             this.partitionLevelDeletion = 
DeletionTime.serializer.deserialize(file);
-            SerializationHelper helper = new 
SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            SerializationHelper helper = new 
SerializationHelper(sstable.metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
             this.iterator = SSTableSimpleIterator.create(sstable.metadata, 
file, sstable.header, helper, partitionLevelDeletion);
             this.staticRow = iterator.readStaticRow();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index 9e2faee..56b621a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.io.IOError;
 import java.util.Iterator;
@@ -42,10 +41,10 @@ import org.apache.cassandra.net.MessagingService;
 public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered> implements Iterator<Unfiltered>
 {
     protected final CFMetaData metadata;
-    protected final DataInput in;
+    protected final DataInputPlus in;
     protected final SerializationHelper helper;
 
-    private SSTableSimpleIterator(CFMetaData metadata, DataInput in, 
SerializationHelper helper)
+    private SSTableSimpleIterator(CFMetaData metadata, DataInputPlus in, 
SerializationHelper helper)
     {
         this.metadata = metadata;
         this.in = in;
@@ -66,37 +65,26 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
     {
         private final SerializationHeader header;
 
-        private final ReusableRow row;
-        private final RangeTombstoneMarker.Builder markerBuilder;
+        private final Row.Builder builder;
 
-        private CurrentFormatIterator(CFMetaData metadata, DataInput in, 
SerializationHeader header, SerializationHelper helper)
+        private CurrentFormatIterator(CFMetaData metadata, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper)
         {
             super(metadata, in, helper);
             this.header = header;
-
-            int clusteringSize = metadata.comparator.size();
-            Columns regularColumns = header == null ? 
metadata.partitionColumns().regulars : header.columns().regulars;
-
-            this.row = new ReusableRow(clusteringSize, regularColumns, true, 
metadata.isCounter());
-            this.markerBuilder = new 
RangeTombstoneMarker.Builder(clusteringSize);
+            this.builder = 
ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header));
         }
 
         public Row readStaticRow() throws IOException
         {
-            return header.hasStatic()
-                ? UnfilteredSerializer.serializer.deserializeStaticRow(in, 
header, helper)
-                : Rows.EMPTY_STATIC_ROW;
+            return header.hasStatic() ? 
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper) : 
Rows.EMPTY_STATIC_ROW;
         }
 
         protected Unfiltered computeNext()
         {
             try
             {
-                Unfiltered.Kind kind = 
UnfilteredSerializer.serializer.deserialize(in, header, helper, row.writer(), 
markerBuilder.reset());
-
-                return kind == null
-                     ? endOfData()
-                     : (kind == Unfiltered.Kind.ROW ? row : 
markerBuilder.build());
+                Unfiltered unfiltered = 
UnfilteredSerializer.serializer.deserialize(in, header, helper, builder);
+                return unfiltered == null ? endOfData() : unfiltered;
             }
             catch (IOException e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 5dbe52a..ef3bde1 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -32,7 +32,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -58,6 +60,9 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     private final long bufferSize;
     private long currentSize;
 
+    // Used to compute the row serialized size
+    private final SerializationHeader header;
+
     private final BlockingQueue<Buffer> writeQueue = new 
SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
@@ -65,6 +70,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
     {
         super(directory, metadata, partitioner, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
+        this.header = new SerializationHeader(metadata, columns, 
RowStats.NO_STATS);
         diskWriter.start();
     }
 
@@ -76,42 +82,21 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
         if (previous == null)
         {
             previous = createPartitionUpdate(key);
-            count(PartitionUpdate.serializer.serializedSize(previous, 
formatType.info.getLatestVersion().correspondingMessagingVersion()));
+            currentSize += PartitionUpdate.serializer.serializedSize(previous, 
formatType.info.getLatestVersion().correspondingMessagingVersion());
             previous.allowNewUpdates();
             buffer.put(key, previous);
         }
         return previous;
     }
 
-    private void count(long size)
-    {
-        currentSize += size;
-    }
-
-    private void countCell(ColumnDefinition column, ByteBuffer value, 
LivenessInfo info, CellPath path)
+    private void countRow(Row row)
     {
-        // Note that the accounting of a cell is a bit inaccurate (it doesn't 
take some of the file format optimization into account)
+        // Note that the accounting of a row is a bit inaccurate (it doesn't 
take some of the file format optimization into account)
         // and the maintaining of the bufferSize is in general not perfect. 
This has always been the case for this class but we should
         // improve that. In particular, what we count is closer to the 
serialized value, but it's debatable that it's the right thing
         // to count since it will take a lot more space in memory and the 
bufferSize if first and foremost used to avoid OOM when
         // using this writer.
-
-        count(1); // Each cell has a byte flag on disk
-
-        if (value.hasRemaining())
-            count(column.type.writtenLength(value));
-
-        count(8); // timestamp
-        if (info.hasLocalDeletionTime())
-            count(4);
-        if (info.hasTTL())
-            count(4);
-
-        if (path != null)
-        {
-            assert path.size() == 1;
-            count(2 + path.get(0).remaining());
-        }
+        currentSize += UnfilteredSerializer.serializer.serializedSize(row, 
header, formatType.info.getLatestVersion().correspondingMessagingVersion());
     }
 
     private void maybeSync() throws SyncException
@@ -134,52 +119,11 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
         return new PartitionUpdate(metadata, key, columns, 4)
         {
             @Override
-            protected StaticWriter createStaticWriter()
+            public void add(Row row)
             {
-                return new StaticWriter()
-                {
-                    @Override
-                    public void writeCell(ColumnDefinition column, boolean 
isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
-                    {
-                        super.writeCell(column, isCounter, value, info, path);
-                        countCell(column, value, info, path);
-                    }
-
-                    @Override
-                    public void endOfRow()
-                    {
-                        super.endOfRow();
-                        maybeSync();
-                    }
-                };
-            }
-
-            @Override
-            protected Writer createWriter()
-            {
-                return new RegularWriter()
-                {
-                    @Override
-                    public void writeClusteringValue(ByteBuffer value)
-                    {
-                        super.writeClusteringValue(value);
-                        count(2 + value.remaining());
-                    }
-
-                    @Override
-                    public void writeCell(ColumnDefinition column, boolean 
isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
-                    {
-                        super.writeCell(column, isCounter, value, info, path);
-                        countCell(column, value, info, path);
-                    }
-
-                    @Override
-                    public void endOfRow()
-                    {
-                        super.endOfRow();
-                        maybeSync();
-                    }
-                };
+                super.add(row);
+                countRow(row);
+                maybeSync();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index a991d99..6759293 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.*;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -30,7 +29,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.IPartitioner;
@@ -54,11 +52,8 @@ public class BigTableWriter extends SSTableWriter
 {
     private static final Logger logger = 
LoggerFactory.getLogger(BigTableWriter.class);
 
-    // not very random, but the only value that can't be mistaken for a legal 
column-name length
-    public static final int END_OF_ROW = 0x0000;
-
     private final IndexWriter iwriter;
-    private SegmentedFile.Builder dbuilder;
+    private final SegmentedFile.Builder dbuilder;
     private final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
     private FileMark dataMark;
@@ -101,8 +96,8 @@ public class BigTableWriter extends SSTableWriter
     private long beforeAppend(DecoratedKey decoratedKey)
     {
         assert decoratedKey != null : "Keys must not be null"; // empty keys 
ARE allowed b/c of indexed column values
-        if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
-            throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
+        //if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
+        //    throw new RuntimeException("Last written key " + lastWrittenKey 
+ " >= current key " + decoratedKey + " writing into " + getFilename());
         return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
     }
 
@@ -172,11 +167,10 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
-    private static class StatsCollector extends WrappingUnfilteredRowIterator
+    private static class StatsCollector extends AlteringUnfilteredRowIterator
     {
-        private int cellCount;
         private final MetadataCollector collector;
-        private final Set<ColumnDefinition> complexColumnsSetInRow = new 
HashSet<>();
+        private int cellCount;
 
         StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector)
         {
@@ -186,55 +180,36 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        public Unfiltered next()
+        protected Row computeNextStatic(Row row)
+        {
+            if (!row.isEmpty())
+                cellCount += Rows.collectStats(row, collector);
+            return row;
+        }
+
+        @Override
+        protected Row computeNext(Row row)
         {
-            Unfiltered unfiltered = super.next();
-            collector.updateClusteringValues(unfiltered.clustering());
+            collector.updateClusteringValues(row.clustering());
+            cellCount += Rows.collectStats(row, collector);
+            return row;
+        }
 
-            switch (unfiltered.kind())
+        @Override
+        protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+        {
+            collector.updateClusteringValues(marker.clustering());
+            if (marker.isBoundary())
+            {
+                RangeTombstoneBoundaryMarker bm = 
(RangeTombstoneBoundaryMarker)marker;
+                collector.update(bm.endDeletionTime());
+                collector.update(bm.startDeletionTime());
+            }
+            else
             {
-                case ROW:
-                    Row row = (Row) unfiltered;
-                    collector.update(row.primaryKeyLivenessInfo());
-                    collector.update(row.deletion());
-
-                    int simpleColumnsSet = 0;
-                    complexColumnsSetInRow.clear();
-
-                    for (Cell cell : row)
-                    {
-                        if (cell.column().isComplex())
-                            complexColumnsSetInRow.add(cell.column());
-                        else
-                            ++simpleColumnsSet;
-
-                        ++cellCount;
-                        collector.update(cell.livenessInfo());
-
-                        if (cell.isCounterCell())
-                            
collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell));
-                    }
-
-                    for (int i = 0; i < row.columns().complexColumnCount(); 
i++)
-                        
collector.update(row.getDeletion(row.columns().getComplex(i)));
-
-                    collector.updateColumnSetPerRow(simpleColumnsSet + 
complexColumnsSetInRow.size());
-
-                    break;
-                case RANGE_TOMBSTONE_MARKER:
-                    if (((RangeTombstoneMarker) unfiltered).isBoundary())
-                    {
-                        RangeTombstoneBoundaryMarker bm = 
(RangeTombstoneBoundaryMarker)unfiltered;
-                        collector.update(bm.endDeletionTime());
-                        collector.update(bm.startDeletionTime());
-                    }
-                    else
-                    {
-                        
collector.update(((RangeTombstoneBoundMarker)unfiltered).deletionTime());
-                    }
-                    break;
+                
collector.update(((RangeTombstoneBoundMarker)marker).deletionTime());
             }
-            return unfiltered;
+            return marker;
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 2574c62..9b06b53 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -33,6 +33,8 @@ import 
com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -41,7 +43,7 @@ import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.cassandra.utils.MurmurHash;
 import org.apache.cassandra.utils.StreamingHistogram;
 
-public class MetadataCollector
+public class MetadataCollector implements PartitionStatisticsCollector
 {
     public static final double NO_COMPRESSION_RATIO = -1.0;
 
@@ -89,8 +91,8 @@ public class MetadataCollector
     protected EstimatedHistogram estimatedCellPerPartitionCount = 
defaultCellPerPartitionCountHistogram();
     protected ReplayPosition replayPosition = ReplayPosition.NONE;
     protected final MinMaxLongTracker timestampTracker = new 
MinMaxLongTracker();
-    protected final MinMaxIntTracker localDeletionTimeTracker = new 
MinMaxIntTracker(LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_DELETION_TIME);
-    protected final MinMaxIntTracker ttlTracker = new 
MinMaxIntTracker(LivenessInfo.NO_TTL, LivenessInfo.NO_TTL);
+    protected final MinMaxIntTracker localDeletionTimeTracker = new 
MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
+    protected final MinMaxIntTracker ttlTracker = new 
MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
     protected double compressionRatio = NO_COMPRESSION_RATIO;
     protected Set<Integer> ancestors = new HashSet<>();
     protected StreamingHistogram estimatedTombstoneDropTime = 
defaultTombstoneDropTimeHistogram();
@@ -178,34 +180,39 @@ public class MetadataCollector
         return this;
     }
 
-    public MetadataCollector update(LivenessInfo newInfo)
+    public void update(LivenessInfo newInfo)
     {
-        // If the info doesn't have a timestamp, this means the info is 
basically irrelevant (it's a row
-        // update whose only info we care are the cells info basically).
-        if (newInfo.hasTimestamp())
+        if (newInfo.isEmpty())
+            return;
+
+        updateTimestamp(newInfo.timestamp());
+        if (newInfo.isExpiring())
         {
-            updateTimestamp(newInfo.timestamp());
             updateTTL(newInfo.ttl());
-            updateLocalDeletionTime(newInfo.localDeletionTime());
+            updateLocalDeletionTime(newInfo.localExpirationTime());
         }
-        return this;
     }
 
-    public MetadataCollector update(DeletionTime dt)
+    public void update(Cell cell)
+    {
+        updateTimestamp(cell.timestamp());
+        updateTTL(cell.ttl());
+        updateLocalDeletionTime(cell.localDeletionTime());
+    }
+
+    public void update(DeletionTime dt)
     {
         if (!dt.isLive())
         {
             updateTimestamp(dt.markedForDeleteAt());
             updateLocalDeletionTime(dt.localDeletionTime());
         }
-        return this;
     }
 
-    public MetadataCollector updateColumnSetPerRow(long columnSetInRow)
+    public void updateColumnSetPerRow(long columnSetInRow)
     {
         totalColumnsSet += columnSetInRow;
         ++totalRows;
-        return this;
     }
 
     private void updateTimestamp(long newTimestamp)
@@ -279,10 +286,9 @@ public class MetadataCollector
         return b2;
     }
 
-    public MetadataCollector updateHasLegacyCounterShards(boolean 
hasLegacyCounterShards)
+    public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
     {
         this.hasLegacyCounterShards = this.hasLegacyCounterShards || 
hasLegacyCounterShards;
-        return this;
     }
 
     public Map<MetadataType, MetadataComponent> finalizeMetadata(String 
partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader 
header)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java 
b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 879a505..50644bb 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -371,18 +371,18 @@ public final class LegacySchemaMigrator
 
         if (isSuper)
         {
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, 
defaultValidator, true), null));
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, 
defaultValidator, true)));
         }
         else if (isStaticCompactTable)
         {
             defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, 
names.defaultClusteringName(), rawComparator, null));
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), defaultValidator, null));
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), defaultValidator));
         }
         else
         {
             // For dense compact tables, we get here if we don't have a 
compact value column, in which case we should add it
             // (we use EmptyType to recognize that the compact value was not 
declared by the use (see CreateTableStatement too))
-            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), EmptyType.instance, null));
+            defs.add(ColumnDefinition.regularDef(ksName, cfName, 
names.defaultCompactValueName(), EmptyType.instance));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 4228a46..2eb0ac0 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -353,7 +353,7 @@ public final class SchemaKeyspace
                         mutationMap.put(key, mutation);
                     }
 
-                    mutation.add(UnfilteredRowIterators.toUpdate(partition));
+                    mutation.add(PartitionUpdate.fromIterator(partition));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index f164a60..a9024e3 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -39,7 +39,7 @@ import org.apache.cassandra.utils.FBUtilities;
 
 public class DataResolver extends ResponseResolver
 {
-    private final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<AsyncOneResponse>());
+    private final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 
     public DataResolver(Keyspace keyspace, ReadCommand command, 
ConsistencyLevel consistency, int maxResponseCount)
     {
@@ -162,9 +162,8 @@ public class DataResolver extends ResponseResolver
             private final boolean isReversed;
             private final PartitionUpdate[] repairs = new 
PartitionUpdate[sources.length];
 
-            private final Row.Writer[] currentRows = new 
Row.Writer[sources.length];
-            private Clustering currentClustering;
-            private ColumnDefinition currentColumn;
+            private final Row.Builder[] currentRows = new 
Row.Builder[sources.length];
+            private final RowDiffListener diffListener;
 
             private final Slice.Bound[] markerOpen = new 
Slice.Bound[sources.length];
             private final DeletionTime[] markerTime = new 
DeletionTime[sources.length];
@@ -174,87 +173,75 @@ public class DataResolver extends ResponseResolver
                 this.partitionKey = partitionKey;
                 this.columns = columns;
                 this.isReversed = isReversed;
-            }
 
-            private PartitionUpdate update(int i)
-            {
-                PartitionUpdate upd = repairs[i];
-                if (upd == null)
+                this.diffListener = new RowDiffListener()
                 {
-                    upd = new PartitionUpdate(command.metadata(), 
partitionKey, columns, 1);
-                    repairs[i] = upd;
-                }
-                return upd;
-            }
+                    public void onPrimaryKeyLivenessInfo(int i, Clustering 
clustering, LivenessInfo merged, LivenessInfo original)
+                    {
+                        if (merged != null && !merged.equals(original))
+                            currentRow(i, 
clustering).addPrimaryKeyLivenessInfo(merged);
+                    }
 
-            private Row.Writer currentRow(int i)
-            {
-                Row.Writer row = currentRows[i];
-                if (row == null)
-                {
-                    row = currentClustering == Clustering.STATIC_CLUSTERING ? 
update(i).staticWriter() : update(i).writer();
-                    currentClustering.writeTo(row);
-                    currentRows[i] = row;
-                }
-                return row;
-            }
+                    public void onDeletion(int i, Clustering clustering, 
DeletionTime merged, DeletionTime original)
+                    {
+                        if (merged != null && !merged.equals(original))
+                            currentRow(i, clustering).addRowDeletion(merged);
+                    }
 
-            public void onMergePartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
-            {
-                for (int i = 0; i < versions.length; i++)
-                {
-                    DeletionTime version = versions[i];
-                    if (mergedDeletion.supersedes(versions[i]))
-                        update(i).addPartitionDeletion(mergedDeletion);
-                }
-            }
+                    public void onComplexDeletion(int i, Clustering 
clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                    {
+                        if (merged != null && !merged.equals(original))
+                            currentRow(i, 
clustering).addComplexDeletion(column, merged);
+                    }
 
-            public void onMergingRows(Clustering clustering,
-                                      LivenessInfo mergedInfo,
-                                      DeletionTime mergedDeletion,
-                                      Row[] versions)
-            {
-                currentClustering = clustering;
-                for (int i = 0; i < versions.length; i++)
-                {
-                    Row version = versions[i];
+                    public void onCell(int i, Clustering clustering, Cell 
merged, Cell original)
+                    {
+                        if (merged != null && !merged.equals(original))
+                            currentRow(i, clustering).addCell(merged);
+                    }
 
-                    if (version == null || 
mergedInfo.supersedes(version.primaryKeyLivenessInfo()))
-                        
currentRow(i).writePartitionKeyLivenessInfo(mergedInfo);
+                };
+            }
 
-                    if (version == null || 
mergedDeletion.supersedes(version.deletion()))
-                        currentRow(i).writeRowDeletion(mergedDeletion);
-                }
+            private PartitionUpdate update(int i)
+            {
+                if (repairs[i] == null)
+                    repairs[i] = new PartitionUpdate(command.metadata(), 
partitionKey, columns, 1);
+                return repairs[i];
             }
 
-            public void onMergedComplexDeletion(ColumnDefinition c, 
DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+            private Row.Builder currentRow(int i, Clustering clustering)
             {
-                currentColumn = c;
-                for (int i = 0; i < versions.length; i++)
+                if (currentRows[i] == null)
                 {
-                    DeletionTime version = versions[i] == null ? 
DeletionTime.LIVE : versions[i];
-                    if (mergedCompositeDeletion.supersedes(version))
-                        currentRow(i).writeComplexDeletion(c, 
mergedCompositeDeletion);
+                    currentRows[i] = ArrayBackedRow.sortedBuilder(clustering 
== Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars);
+                    currentRows[i].newRow(clustering);
                 }
+                return currentRows[i];
             }
 
-            public void onMergedCells(Cell mergedCell, Cell[] versions)
+            public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
             {
                 for (int i = 0; i < versions.length; i++)
                 {
-                    Cell version = versions[i];
-                    Cell toAdd = version == null ? mergedCell : 
Cells.diff(mergedCell, version);
-                    if (toAdd != null)
-                        toAdd.writeTo(currentRow(i));
+                    if (mergedDeletion.supersedes(versions[i]))
+                        update(i).addPartitionDeletion(mergedDeletion);
                 }
             }
 
-            public void onRowDone()
+            public void onMergedRows(Row merged, Columns columns, Row[] 
versions)
             {
+                // If a row was shadowed post merged, it must be by a 
partition level or range tombstone, and we handle
+                // those case directly in their respective methods (in other 
words, it would be inefficient to send a row
+                // deletion as repair when we know we've already send a 
partition level or range tombstone that covers it).
+                if (merged.isEmpty())
+                    return;
+
+                Rows.diff(merged, columns, versions, diffListener);
                 for (int i = 0; i < currentRows.length; i++)
                 {
                     if (currentRows[i] != null)
-                        currentRows[i].endOfRow();
+                        update(i).add(currentRows[i].build());
                 }
                 Arrays.fill(currentRows, null);
             }
@@ -268,12 +255,12 @@ public class DataResolver extends ResponseResolver
                     if (merged.isClose(isReversed) && markerOpen[i] != null)
                     {
                         Slice.Bound open = markerOpen[i];
-                        Slice.Bound close = merged.isBoundary() ? 
((RangeTombstoneBoundaryMarker)merged).createCorrespondingCloseBound(isReversed).clustering()
 : merged.clustering();
-                        update(i).addRangeTombstone(Slice.make(isReversed ? 
close : open, isReversed ? open : close), markerTime[i]);
+                        Slice.Bound close = merged.closeBound(isReversed);
+                        update(i).add(new RangeTombstone(Slice.make(isReversed 
? close : open, isReversed ? open : close), markerTime[i]));
                     }
                     if (merged.isOpen(isReversed) && (marker == null || 
merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
                     {
-                        markerOpen[i] = merged.isBoundary() ? 
((RangeTombstoneBoundaryMarker)merged).createCorrespondingOpenBound(isReversed).clustering()
 : merged.clustering();
+                        markerOpen[i] = merged.openBound(isReversed);
                         markerTime[i] = merged.openDeletionTime(isReversed);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java 
b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 6429be0..6c08be0 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -96,7 +96,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager
         if (last != null)
         {
             lastReturnedKey = key;
-            lastReturnedClustering = last.clustering().takeAlias();
+            lastReturnedClustering = last.clustering();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java 
b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 6488641..223c3fd 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -80,6 +80,6 @@ public class SinglePartitionPager extends AbstractQueryPager
     protected void recordLast(DecoratedKey key, Row last)
     {
         if (last != null)
-            lastReturned = last.clustering().takeAlias();
+            lastReturned = last.clustering();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/paxos/Commit.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java 
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 9a5e619..579c315 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -27,7 +27,6 @@ import java.util.UUID;
 import com.google.common.base.Objects;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
@@ -63,7 +62,8 @@ public class Commit
 
     public static Commit newProposal(UUID ballot, PartitionUpdate update)
     {
-        return new Commit(ballot, updatesWithPaxosTime(update, ballot));
+        update.updateAllTimestamp(UUIDGen.microsTimestamp(ballot));
+        return new Commit(ballot, update);
     }
 
     public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata)
@@ -83,7 +83,6 @@ public class Commit
 
     public Mutation makeMutation()
     {
-        assert update != null;
         return new Mutation(update);
     }
 
@@ -95,10 +94,7 @@ public class Commit
 
         Commit commit = (Commit) o;
 
-        if (!ballot.equals(commit.ballot)) return false;
-        if (!update.equals(commit.update)) return false;
-
-        return true;
+        return ballot.equals(commit.ballot) && update.equals(commit.update);
     }
 
     @Override
@@ -107,46 +103,6 @@ public class Commit
         return Objects.hashCode(ballot, update);
     }
 
-    private static PartitionUpdate updatesWithPaxosTime(PartitionUpdate 
update, UUID ballot)
-    {
-        long t = UUIDGen.microsTimestamp(ballot);
-        // Using t-1 for tombstones so deletion doesn't trump newly inserted 
data (#6069)
-        PartitionUpdate newUpdate = new PartitionUpdate(update.metadata(),
-                                                        update.partitionKey(),
-                                                        
update.deletionInfo().updateAllTimestamp(t-1),
-                                                        update.columns(),
-                                                        update.rowCount());
-
-        if (!update.staticRow().isEmpty())
-            copyWithUpdatedTimestamp(update.staticRow(), 
newUpdate.staticWriter(), t);
-
-        for (Row row : update)
-            copyWithUpdatedTimestamp(row, newUpdate.writer(), t);
-
-        return newUpdate;
-    }
-
-    private static void copyWithUpdatedTimestamp(Row row, Row.Writer writer, 
long timestamp)
-    {
-        Rows.writeClustering(row.clustering(), writer);
-        
writer.writePartitionKeyLivenessInfo(row.primaryKeyLivenessInfo().withUpdatedTimestamp(timestamp));
-        writer.writeRowDeletion(row.deletion());
-
-        for (Cell cell : row)
-            writer.writeCell(cell.column(), cell.isCounterCell(), 
cell.value(), cell.livenessInfo().withUpdatedTimestamp(timestamp), cell.path());
-
-        for (int i = 0; i < row.columns().complexColumnCount(); i++)
-        {
-            ColumnDefinition c = row.columns().getComplex(i);
-            DeletionTime dt = row.getDeletion(c);
-            // We use t-1 to make sure that on inserting a collection literal, 
the deletion that comes with it does not
-            // end up deleting the inserted data (see #6069)
-            if (!dt.isLive())
-                writer.writeComplexDeletion(c, new 
SimpleDeletionTime(timestamp-1, dt.localDeletionTime()));
-        }
-        writer.endOfRow();
-    }
-
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java 
b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 2876f08..ee646aa 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.streaming;
 
 import java.io.*;
-import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.Collection;
@@ -36,7 +35,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
@@ -184,16 +182,13 @@ public class StreamReader
         private Row staticRow;
         private IOException exception;
 
-        private final CounterFilteredRow counterRow;
-
         public StreamDeserializer(CFMetaData metadata, DataInputPlus in, 
Version version, SerializationHeader header)
         {
             assert version.storeRows() : "We don't allow streaming from 
pre-3.0 nodes";
             this.metadata = metadata;
             this.in = in;
-            this.helper = new 
SerializationHelper(version.correspondingMessagingVersion(), 
SerializationHelper.Flag.PRESERVE_SIZE);
+            this.helper = new SerializationHelper(metadata, 
version.correspondingMessagingVersion(), 
SerializationHelper.Flag.PRESERVE_SIZE);
             this.header = header;
-            this.counterRow = metadata.isCounter() ? new CounterFilteredRow() 
: null;
         }
 
         public DecoratedKey newPartition() throws IOException
@@ -271,9 +266,7 @@ public class StreamReader
 
         private Row maybeMarkLocalToBeCleared(Row row)
         {
-            return metadata.isCounter()
-                 ? counterRow.setTo(row)
-                 : row;
+            return metadata.isCounter() ? row.markCounterLocalToBeCleared() : 
row;
         }
 
         public void checkForExceptions() throws IOException
@@ -286,18 +279,4 @@ public class StreamReader
         {
         }
     }
-
-    private static class CounterFilteredRow extends WrappingRow
-    {
-        protected Cell filterCell(Cell cell)
-        {
-            if (!cell.isCounterCell())
-                return cell;
-
-            ByteBuffer marked = 
CounterContext.instance().markLocalToBeCleared(cell.value());
-            return marked == cell.value()
-                 ? cell
-                 : Cells.create(cell.column(), true, marked, 
cell.livenessInfo(), cell.path());
-        }
-    }
 }

Reply via email to