http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 670b1ae..e805fd2 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -17,12 +17,12 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,99 +31,185 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Sorting;
+import org.apache.cassandra.utils.MergeIterator;
 
 /**
  * Stores updates made on a partition.
  * <p>
- * A PartitionUpdate object requires that all writes are performed before we
- * try to read the updates (attempts to write to the PartitionUpdate after a
- * read method has been called will result in an exception being thrown).
- * In other words, a Partition is mutable while we do a write and become
- * immutable as soon as it is read.
+ * A PartitionUpdate object requires that all writes/additions are performed 
before we
+ * try to read the updates (attempts to write to the PartitionUpdate after a 
read method
+ * has been called will result in an exception being thrown). In other words, 
a Partition
+ * is mutable while it's written but becomes immutable as soon as it is read.
  * <p>
- * Row updates are added to this update through the {@link #writer} method 
which
- * returns a {@link Row.Writer}. Multiple rows can be added to this writer as 
required and
- * those row do not have to added in (clustering) order, and the same row can 
be added
- * multiple times. Further, for a given row, the writer actually supports 
intermingling
- * the writing of cells for different complex cells (note that this is usually 
not supported
- * by {@code Row.Writer} implementations, but is supported here because
- * {@code ModificationStatement} requires that (because we could have multiple 
{@link Operation}
- * on the same column in a given statement)).
+ * A typical usage is to create a new update ({@code new 
PartitionUpdate(metadata, key, columns, capacity)})
+ * and then add rows and range tombstones through the {@code add()} methods 
(the partition
+ * level deletion time can also be set with {@code addPartitionDeletion()}). 
However, there
+ * is also a few static helper constructor methods for special cases ({@code 
emptyUpdate()},
+ * {@code fullPartitionDelete} and {@code singleRowUpdate}).
  */
-public class PartitionUpdate extends AbstractPartitionData implements 
Sorting.Sortable
+public class PartitionUpdate extends AbstractThreadUnsafePartition
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(PartitionUpdate.class);
 
-    // Records whether the partition update has been sorted (it is the rows 
contained in the partition
-    // that are sorted since we don't require rows to be added in order). 
Sorting happens when the
-    // update is read, and writting is rejected as soon as the update is 
sorted (it's actually possible
-    // to manually allow new update by using allowNewUpdates(), and we could 
make that more implicit, but
-    // as only triggers really requires it, we keep it simple for now).
-    private boolean isSorted;
-
     public static final PartitionUpdateSerializer serializer = new 
PartitionUpdateSerializer();
 
-    private final Writer writer;
+    private final int createdAtInSec = FBUtilities.nowInSeconds();
+
+    // Records whether this update is "built", i.e. if the build() method has 
been called, which
+    // happens when the update is read. Further writing is then rejected 
though a manual call
+    // to allowNewUpdates() allow new writes. We could make that more implicit 
but only triggers
+    // really requires that so we keep it simple for now).
+    private boolean isBuilt;
+    private boolean canReOpen = true;
+
+    private final MutableDeletionInfo deletionInfo;
+    private RowStats stats; // will be null if isn't built
+
+    private Row staticRow = Rows.EMPTY_STATIC_ROW;
 
-    // Used by compare for the sake of implementing the Sorting.Sortable 
interface (which is in turn used
-    // to sort the rows of this update).
-    private final InternalReusableClustering p1 = new 
InternalReusableClustering();
-    private final InternalReusableClustering p2 = new 
InternalReusableClustering();
+    private final boolean canHaveShadowedData;
 
     private PartitionUpdate(CFMetaData metadata,
                             DecoratedKey key,
-                            DeletionInfo delInfo,
-                            RowDataBlock data,
                             PartitionColumns columns,
-                            int initialRowCapacity)
+                            Row staticRow,
+                            List<Row> rows,
+                            MutableDeletionInfo deletionInfo,
+                            RowStats stats,
+                            boolean isBuilt,
+                            boolean canHaveShadowedData)
     {
-        super(metadata, key, delInfo, columns, data, initialRowCapacity);
-        this.writer = createWriter();
+        super(metadata, key, columns, rows);
+        this.staticRow = staticRow;
+        this.deletionInfo = deletionInfo;
+        this.stats = stats;
+        this.isBuilt = isBuilt;
+        this.canHaveShadowedData = canHaveShadowedData;
     }
 
     public PartitionUpdate(CFMetaData metadata,
                            DecoratedKey key,
-                           DeletionInfo delInfo,
                            PartitionColumns columns,
                            int initialRowCapacity)
     {
-        this(metadata,
-             key,
-             delInfo,
-             new RowDataBlock(columns.regulars, initialRowCapacity, true, 
metadata.isCounter()),
-             columns,
-             initialRowCapacity);
+        this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new 
ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
     }
 
-    public PartitionUpdate(CFMetaData metadata,
-                           DecoratedKey key,
-                           PartitionColumns columns,
-                           int initialRowCapacity)
+    /**
+     * Creates a empty immutable partition update.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the created update.
+     *
+     * @return the newly created empty (and immutable) update.
+     */
+    public static PartitionUpdate emptyUpdate(CFMetaData metadata, 
DecoratedKey key)
+    {
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), 
MutableDeletionInfo.live(), RowStats.NO_STATS, true, false);
+    }
+
+    /**
+     * Creates an immutable partition update that entirely deletes a given 
partition.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition that the created update 
should delete.
+     * @param timestamp the timestamp for the deletion.
+     * @param nowInSec the current time in seconds to use as local deletion 
time for the partition deletion.
+     *
+     * @return the newly created partition deletion update.
+     */
+    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, 
DecoratedKey key, long timestamp, int nowInSec)
+    {
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new 
MutableDeletionInfo(timestamp, nowInSec), RowStats.NO_STATS, true, false);
+    }
+
+    /**
+     * Creates an immutable partition update that contains a single row update.
+     *
+     * @param metadata the metadata for the created update.
+     * @param key the partition key for the partition that the created update 
should delete.
+     * @param row the row for the update.
+     *
+     * @return the newly created partition update containing only {@code row}.
+     */
+    public static PartitionUpdate singleRowUpdate(CFMetaData metadata, 
DecoratedKey key, Row row)
+    {
+        return row.isStatic()
+             ? new PartitionUpdate(metadata, key, new 
PartitionColumns(row.columns(), Columns.NONE), row, 
Collections.<Row>emptyList(), MutableDeletionInfo.live(), RowStats.NO_STATS, 
true, false)
+             : new PartitionUpdate(metadata, key, new 
PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, 
Collections.singletonList(row), MutableDeletionInfo.live(), RowStats.NO_STATS, 
true, false);
+    }
+
+    /**
+     * Turns the given iterator into an update.
+     *
+     * Warning: this method does not close the provided iterator, it is up to
+     * the caller to close it.
+     */
+    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
+    {
+        CFMetaData metadata = iterator.metadata();
+        boolean reversed = iterator.isReverseOrder();
+
+        List<Row> rows = new ArrayList<>();
+        MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), 
metadata.comparator, reversed);
+
+        while (iterator.hasNext())
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                rows.add((Row)unfiltered);
+            else
+                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+        }
+
+        if (reversed)
+            Collections.reverse(rows);
+
+        return new PartitionUpdate(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), 
iterator.stats(), true, false);
+    }
+
+    public static PartitionUpdate fromIterator(RowIterator iterator)
+    {
+        CFMetaData metadata = iterator.metadata();
+        boolean reversed = iterator.isReverseOrder();
+
+        List<Row> rows = new ArrayList<>();
+
+        RowStats.Collector collector = new RowStats.Collector();
+
+        while (iterator.hasNext())
+        {
+            Row row = iterator.next();
+            rows.add(row);
+            Rows.collectStats(row, collector);
+        }
+
+        if (reversed)
+            Collections.reverse(rows);
+
+        return new PartitionUpdate(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows, MutableDeletionInfo.live(), 
collector.get(), true, false);
+    }
+
+    protected boolean canHaveShadowedData()
     {
-        this(metadata,
-             key,
-             DeletionInfo.live(),
-             columns,
-             initialRowCapacity);
+        return canHaveShadowedData;
     }
 
-    protected Writer createWriter()
+    public Row staticRow()
     {
-        return new RegularWriter();
+        return staticRow;
     }
 
-    protected StaticWriter createStaticWriter()
+    public DeletionInfo deletionInfo()
     {
-        return new StaticWriter();
+        return deletionInfo;
     }
 
     /**
@@ -166,7 +252,7 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
     {
         try (DataOutputBuffer out = new DataOutputBuffer())
         {
-            serializer.serialize(update, out, 
MessagingService.current_version);
+            serializer.serialize(update, out, version);
             return ByteBuffer.wrap(out.getData(), 0, out.getLength());
         }
         catch (IOException e)
@@ -176,60 +262,6 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
     }
 
     /**
-     * Creates a empty immutable partition update.
-     *
-     * @param metadata the metadata for the created update.
-     * @param key the partition key for the created update.
-     *
-     * @return the newly created empty (and immutable) update.
-     */
-    public static PartitionUpdate emptyUpdate(CFMetaData metadata, 
DecoratedKey key)
-    {
-        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 0)
-        {
-            public Row.Writer staticWriter()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public Row.Writer writer()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void addPartitionDeletion(DeletionTime deletionTime)
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void addRangeTombstone(RangeTombstone range)
-            {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    /**
-     * Creates a partition update that entirely deletes a given partition.
-     *
-     * @param metadata the metadata for the created update.
-     * @param key the partition key for the partition that the created update 
should delete.
-     * @param timestamp the timestamp for the deletion.
-     * @param nowInSec the current time in seconds to use as local deletion 
time for the partition deletion.
-     *
-     * @return the newly created partition deletion update.
-     */
-    public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, 
DecoratedKey key, long timestamp, int nowInSec)
-    {
-        return new PartitionUpdate(metadata,
-                                   key,
-                                   new DeletionInfo(timestamp, nowInSec),
-                                   new RowDataBlock(Columns.NONE, 0, true, 
metadata.isCounter()),
-                                   PartitionColumns.NONE,
-                                   0);
-    }
-
-    /**
      * Merges the provided updates, yielding a new update that incorporates 
all those updates.
      *
      * @param updates the collection of updates to merge. This shouldn't be 
empty.
@@ -239,17 +271,30 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
     public static PartitionUpdate merge(Collection<PartitionUpdate> updates)
     {
         assert !updates.isEmpty();
-        if (updates.size() == 1)
+        final int size = updates.size();
+
+        if (size == 1)
             return Iterables.getOnlyElement(updates);
 
-        int totalSize = 0;
+        // Used when merging row to decide of liveness
+        int nowInSec = FBUtilities.nowInSeconds();
+
         PartitionColumns.Builder builder = PartitionColumns.builder();
         DecoratedKey key = null;
         CFMetaData metadata = null;
+        MutableDeletionInfo deletion = MutableDeletionInfo.live();
+        Row staticRow = Rows.EMPTY_STATIC_ROW;
+        List<Iterator<Row>> updateRowIterators = new ArrayList<>(size);
+        RowStats stats = RowStats.NO_STATS;
+
         for (PartitionUpdate update : updates)
         {
-            totalSize += update.rows;
             builder.addAll(update.columns());
+            deletion.add(update.deletionInfo());
+            if (!update.staticRow().isEmpty())
+                staticRow = staticRow == Rows.EMPTY_STATIC_ROW ? 
update.staticRow() : Rows.merge(staticRow, update.staticRow(), nowInSec);
+            updateRowIterators.add(update.iterator());
+            stats = stats.mergeWith(update.stats());
 
             if (key == null)
                 key = update.partitionKey();
@@ -262,23 +307,70 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
                 assert metadata.cfId.equals(update.metadata().cfId);
         }
 
-        // Used when merging row to decide of liveness
-        int nowInSec = FBUtilities.nowInSeconds();
-        PartitionUpdate newUpdate = new PartitionUpdate(metadata, key, 
builder.build(), totalSize);
-        for (PartitionUpdate update : updates)
+        PartitionColumns columns = builder.build();
+
+        final Row.Merger merger = new Row.Merger(size, nowInSec, 
columns.regulars);
+
+        Iterator<Row> merged = MergeIterator.get(updateRowIterators, 
metadata.comparator, new MergeIterator.Reducer<Row, Row>()
         {
-            newUpdate.deletionInfo.add(update.deletionInfo);
-            if (!update.staticRow().isEmpty())
+            @Override
+            public boolean trivialReduceIsTrivial()
             {
-                if (newUpdate.staticRow().isEmpty())
-                    newUpdate.staticRow = update.staticRow().takeAlias();
-                else
-                    Rows.merge(newUpdate.staticRow(), update.staticRow(), 
newUpdate.columns().statics, newUpdate.staticWriter(), nowInSec, 
SecondaryIndexManager.nullUpdater);
+                return true;
             }
-            for (Row row : update)
-                row.copyTo(newUpdate.writer);
-        }
-        return newUpdate;
+
+            public void reduce(int idx, Row current)
+            {
+                merger.add(idx, current);
+            }
+
+            protected Row getReduced()
+            {
+                // Note that while merger.getRow() can theoretically return 
null, it won't in this case because
+                // we don't pass an "activeDeletion".
+                return merger.merge(DeletionTime.LIVE);
+            }
+
+            @Override
+            protected void onKeyChange()
+            {
+                merger.clear();
+            }
+        });
+
+        List<Row> rows = new ArrayList<>();
+        Iterators.addAll(rows, merged);
+
+        return new PartitionUpdate(metadata, key, columns, staticRow, rows, 
deletion, stats, true, true);
+    }
+
+    /**
+     * Modify this update to set every timestamp for live data to {@code 
newTimestamp} and
+     * every deletion timestamp to {@code newTimestamp - 1}.
+     *
+     * There is no reason to use that expect on the Paxos code path, where we 
need ensure that
+     * anything inserted use the ballot timestamp (to respect the order of 
update decided by
+     * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions 
because tombstones
+     * always win on timestamp equality and we don't want to delete our own 
insertions
+     * (typically, when we overwrite a collection, we first set a complex 
deletion to delete the
+     * previous collection before adding new elements. If we were to set that 
complex deletion
+     * to the same timestamp that the new elements, it would delete those 
elements). And since
+     * tombstones always wins on timestamp equality, using -1 guarantees our 
deletion will still
+     * delete anything from a previous update.
+     */
+    public void updateAllTimestamp(long newTimestamp)
+    {
+        // We know we won't be updating that update again after this call, and 
doing is post built is potentially
+        // slightly more efficient (things are more "compact"). So force a 
build if it hasn't happened yet.
+        maybeBuild();
+
+        deletionInfo.updateAllTimestamp(newTimestamp - 1);
+
+        if (!staticRow.isEmpty())
+            staticRow = staticRow.updateAllTimestamp(newTimestamp);
+
+        for (int i = 0; i < rows.size(); i++)
+            rows.set(i, rows.get(i).updateAllTimestamp(newTimestamp));
     }
 
     /**
@@ -291,7 +383,7 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
      */
     public int operationCount()
     {
-        return rowCount()
+        return rows.size()
              + deletionInfo.rangeCount()
              + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1);
     }
@@ -303,17 +395,29 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
      */
     public int dataSize()
     {
-        int clusteringSize = metadata().comparator.size();
         int size = 0;
         for (Row row : this)
         {
             size += row.clustering().dataSize();
-            for (Cell cell : row)
-                size += cell.dataSize();
+            for (ColumnData cd : row)
+                size += cd.dataSize();
         }
         return size;
     }
 
+    @Override
+    public int rowCount()
+    {
+        maybeBuild();
+        return super.rowCount();
+    }
+
+    public RowStats stats()
+    {
+        maybeBuild();
+        return stats;
+    }
+
     /**
      * If a partition update has been read (and is thus unmodifiable), a call 
to this method
      * makes the update modifiable again.
@@ -325,21 +429,17 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
      */
     public synchronized void allowNewUpdates()
     {
+        if (!canReOpen)
+            throw new IllegalStateException("You cannot do more updates on 
collectCounterMarks has been called");
+
         // This is synchronized to make extra sure things work properly even 
if this is
         // called concurrently with sort() (which should be avoided in the 
first place, but
         // better safe than sorry).
-        isSorted = false;
-    }
-
-    @Override
-    public int rowCount()
-    {
-        maybeSort();
-        return super.rowCount();
+        isBuilt = false;
     }
 
     /**
-     * Returns an iterator that iterators over the rows of this update in 
clustering order.
+     * Returns an iterator that iterates over the rows of this update in 
clustering order.
      * <p>
      * Note that this might trigger a sorting of the update, and as such the 
update will not
      * be modifiable anymore after this call.
@@ -349,14 +449,14 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
     @Override
     public Iterator<Row> iterator()
     {
-        maybeSort();
+        maybeBuild();
         return super.iterator();
     }
 
     @Override
     protected SliceableUnfilteredRowIterator 
sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
     {
-        maybeSort();
+        maybeBuild();
         return super.sliceableUnfilteredIterator(columns, reversed);
     }
 
@@ -370,8 +470,8 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
         for (Row row : this)
         {
             metadata().comparator.validate(row.clustering());
-            for (Cell cell : row)
-                cell.validate();
+            for (ColumnData cd : row)
+                cd.validate();
         }
     }
 
@@ -382,6 +482,27 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
      */
     public long maxTimestamp()
     {
+        maybeBuild();
+
+        long maxTimestamp = deletionInfo.maxTimestamp();
+        for (Row row : this)
+        {
+            maxTimestamp = Math.max(maxTimestamp, 
row.primaryKeyLivenessInfo().timestamp());
+            for (ColumnData cd : row)
+            {
+                if (cd.column().isSimple())
+                {
+                    maxTimestamp = Math.max(maxTimestamp, 
((Cell)cd).timestamp());
+                }
+                else
+                {
+                    ComplexColumnData complexData = (ComplexColumnData)cd;
+                    maxTimestamp = Math.max(maxTimestamp, 
complexData.complexDeletion().markedForDeleteAt());
+                    for (Cell cell : complexData)
+                        maxTimestamp = Math.max(maxTimestamp, 
cell.timestamp());
+                }
+            }
+        }
         return maxTimestamp;
     }
 
@@ -394,62 +515,73 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
     public List<CounterMark> collectCounterMarks()
     {
         assert metadata().isCounter();
+        maybeBuild();
+        // We will take aliases on the rows of this update, and update them 
in-place. So we should be sure the
+        // update is no immutable for all intent and purposes.
+        canReOpen = false;
 
-        InternalReusableClustering clustering = new 
InternalReusableClustering();
         List<CounterMark> l = new ArrayList<>();
-        int i = 0;
-        for (Row row : this)
+        for (Row row : rows)
         {
-            for (Cell cell : row)
+            for (Cell cell : row.cells())
+            {
                 if (cell.isCounterCell())
-                    l.add(new CounterMark(clustering, i, cell.column(), 
cell.path()));
-            i++;
+                    l.add(new CounterMark(row, cell.column(), cell.path()));
+            }
         }
         return l;
     }
 
-    /**
-     * Returns a row writer for the static row of this partition update.
-     *
-     * @return a row writer for the static row of this partition update. A 
partition
-     * update contains only one static row so only one row should be written 
through
-     * this writer (but if multiple rows are added, the latest written one 
wins).
-     */
-    public Row.Writer staticWriter()
+    private void assertNotBuilt()
     {
-        return createStaticWriter();
+        if (isBuilt)
+            throw new IllegalStateException("An update should not be written 
again once it has been read");
     }
 
-    /**
-     * Returns a row writer to add (non-static) rows to this partition update.
-     *
-     * @return a row writer to add (non-static) rows to this partition update.
-     * Multiple rows can be successively added this way and the rows added do 
not have
-     * to be in clustering order. Further, the same row can be added multiple 
time.
-     *
-     */
-    public Row.Writer writer()
+    public void addPartitionDeletion(DeletionTime deletionTime)
     {
-        if (isSorted)
-            throw new IllegalStateException("An update should not written 
again once it has been read");
+        assertNotBuilt();
+        deletionInfo.add(deletionTime);
+    }
 
-        return writer;
+    public void add(RangeTombstone range)
+    {
+        assertNotBuilt();
+        deletionInfo.add(range, metadata.comparator);
     }
 
     /**
-     * Returns a range tombstone marker writer to add range tombstones to this
-     * partition update.
-     * <p>
-     * Note that if more convenient, range tombstones can also be added using
-     * {@link addRangeTombstone}.
+     * Adds a row to this update.
+     *
+     * There is no particular assumption made on the order of row added to a 
partition update. It is further
+     * allowed to add the same row (more precisely, multiple row objects for 
the same clustering).
      *
-     * @param isReverseOrder whether the range tombstone marker will be 
provided to the returned writer
-     * in clustering order or in reverse clustering order.
-     * @return a range tombstone marker writer to add range tombstones to this 
update.
+     * Note however that the columns contained in the added row must be a 
subset of the columns used when
+     * creating this update.
+     *
+     * @param row the row to add.
      */
-    public RangeTombstoneMarker.Writer markerWriter(boolean isReverseOrder)
+    public void add(Row row)
     {
-        return new RangeTombstoneCollector(isReverseOrder);
+        if (row.isEmpty())
+            return;
+
+        assertNotBuilt();
+
+        if (row.isStatic())
+        {
+            // We test for == first because in most case it'll be true and 
that is faster
+            assert columns().statics == row.columns() || 
columns().statics.contains(row.columns());
+            staticRow = staticRow.isEmpty()
+                      ? row
+                      : Rows.merge(staticRow, row, createdAtInSec);
+        }
+        else
+        {
+            // We test for == first because in most case it'll be true and 
that is faster
+            assert columns().regulars == row.columns() || 
columns().regulars.contains(row.columns());
+            rows.add(row);
+        }
     }
 
     /**
@@ -459,160 +591,70 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
      */
     public int size()
     {
-        return rows;
+        return rows.size();
     }
 
-    private void maybeSort()
+    private void maybeBuild()
     {
-        if (isSorted)
+        if (isBuilt)
             return;
 
-        sort();
+        build();
     }
 
-    private synchronized void sort()
+    private synchronized void build()
     {
-        if (isSorted)
+        if (isBuilt)
             return;
 
-        if (rows <= 1)
+        if (rows.size() <= 1)
         {
-            isSorted = true;
+            finishBuild();
             return;
         }
 
-        // Sort the rows - will still potentially contain duplicate 
(non-reconciled) rows
-        Sorting.sort(this);
+        Comparator<Row> comparator = metadata.comparator.rowComparator();
+        // Sort the rows. Because the same row can have been added multiple 
times, we can still have duplicates after that
+        Collections.sort(rows, comparator);
 
-        // Now find duplicates and merge them together
+        // Now find the duplicates and merge them together
         int previous = 0; // The last element that was set
-        int nowInSec = FBUtilities.nowInSeconds();
-        for (int current = 1; current < rows; current++)
+        for (int current = 1; current < rows.size(); current++)
         {
             // There is really only 2 possible comparison: < 0 or == 0 since 
we've sorted already
-            int cmp = compare(previous, current);
+            Row previousRow = rows.get(previous);
+            Row currentRow = rows.get(current);
+            int cmp = comparator.compare(previousRow, currentRow);
             if (cmp == 0)
             {
                 // current and previous are the same row. Merge current into 
previous
                 // (and so previous + 1 will be "free").
-                merge(current, previous, nowInSec);
+                rows.set(previous, Rows.merge(previousRow, currentRow, 
createdAtInSec));
             }
             else
             {
-                // data[current] != [previous], so move current just after 
previous if needs be
+                // current != previous, so move current just after previous if 
needs be
                 ++previous;
                 if (previous != current)
-                    move(current, previous);
+                    rows.set(previous, currentRow);
             }
         }
 
         // previous is on the last value to keep
-        rows = previous + 1;
-
-        isSorted = true;
-    }
+        for (int j = rows.size() - 1; j > previous; j--)
+            rows.remove(j);
 
-    /**
-     * This method is note meant to be used externally: it is only public so 
this
-     * update conform to the {@link Sorting.Sortable} interface.
-     */
-    public int compare(int i, int j)
-    {
-        return metadata.comparator.compare(p1.setTo(i), p2.setTo(j));
-    }
-
-    protected class StaticWriter extends StaticRow.Builder
-    {
-        protected StaticWriter()
-        {
-            super(columns.statics, false, metadata().isCounter());
-        }
-
-        @Override
-        public void endOfRow()
-        {
-            super.endOfRow();
-            if (staticRow == null)
-            {
-                staticRow = build();
-            }
-            else
-            {
-                StaticRow.Builder builder = StaticRow.builder(columns.statics, 
true, metadata().isCounter());
-                Rows.merge(staticRow, build(), columns.statics, builder, 
FBUtilities.nowInSeconds());
-                staticRow = builder.build();
-            }
-        }
+        finishBuild();
     }
 
-    protected class RegularWriter extends Writer
+    private void finishBuild()
     {
-        // For complex column, the writer assumptions is that for a given row, 
cells of different
-        // complex columns are not intermingled (they also should be in 
cellPath order). We however
-        // don't yet guarantee that this will be the case for updates (both 
UpdateStatement and
-        // RowUpdateBuilder can potentially break that assumption; we could 
change those classes but
-        // that's non trivial, at least for UpdateStatement).
-        // To deal with that problem, we record which complex columns have 
been updated (for the current
-        // row) and if we detect a violation of our assumption, we switch the 
row we're writing
-        // into (which is ok because everything will be sorted and merged in 
maybeSort()).
-        private final Set<ColumnDefinition> updatedComplex = new HashSet();
-        private ColumnDefinition lastUpdatedComplex;
-        private CellPath lastUpdatedComplexPath;
-
-        public RegularWriter()
-        {
-            super(false);
-        }
-
-        @Override
-        public void writeCell(ColumnDefinition column, boolean isCounter, 
ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            if (column.isComplex())
-            {
-                if (updatedComplex.contains(column)
-                    && (!column.equals(lastUpdatedComplex) || 
(column.cellPathComparator().compare(path, lastUpdatedComplexPath)) <= 0))
-                {
-                    // We've updated that complex already, but we've either 
updated another complex or it's not in order: as this
-                    // break the writer assumption, switch rows.
-                    endOfRow();
-
-                    // Copy the clustering values from the previous row
-                    int clusteringSize = metadata.clusteringColumns().size();
-                    int base = (row - 1) * clusteringSize;
-                    for (int i = 0; i < clusteringSize; i++)
-                        writer.writeClusteringValue(clusterings[base + i]);
-
-                    updatedComplex.clear();
-                }
-
-                lastUpdatedComplex = column;
-                lastUpdatedComplexPath = path;
-                updatedComplex.add(column);
-            }
-            super.writeCell(column, isCounter, value, info, path);
-        }
-
-        @Override
-        public void endOfRow()
-        {
-            super.endOfRow();
-            clear();
-        }
-
-        @Override
-        public Writer reset()
-        {
-            super.reset();
-            clear();
-            return this;
-        }
-
-        private void clear()
-        {
-            updatedComplex.clear();
-            lastUpdatedComplex = null;
-            lastUpdatedComplexPath = null;
-        }
+        RowStats.Collector collector = new RowStats.Collector();
+        deletionInfo.collectStats(collector);
+        for (Row row : rows)
+            Rows.collectStats(row, collector);
+        stats = collector.get();
+        isBuilt = true;
     }
 
     public static class PartitionUpdateSerializer
@@ -648,7 +690,7 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
             try (UnfilteredRowIterator iter = 
update.sliceableUnfilteredIterator())
             {
                 assert !iter.isReverseOrder();
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
out, version, update.rows);
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
out, version, update.rows.size());
             }
         }
 
@@ -666,37 +708,46 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
                 LegacyLayout.LegacyDeletionInfo info = 
LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
                 int size = in.readInt();
                 Iterator<LegacyLayout.LegacyCell> cells = 
LegacyLayout.deserializeCells(metadata, in, flag, size);
-                SerializationHelper helper = new SerializationHelper(version, 
flag);
+                SerializationHelper helper = new SerializationHelper(metadata, 
version, flag);
                 try (UnfilteredRowIterator iterator = 
LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, 
false, helper))
                 {
-                    return UnfilteredRowIterators.toUpdate(iterator);
+                    return PartitionUpdate.fromIterator(iterator);
                 }
             }
 
             assert key == null; // key is only there for the old format
 
-            UnfilteredRowIteratorSerializer.Header h = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
-            if (h.isEmpty)
-                return emptyUpdate(h.metadata, h.key);
+            UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag);
+            if (header.isEmpty)
+                return emptyUpdate(header.metadata, header.key);
 
-            assert !h.isReversed;
-            assert h.rowEstimate >= 0;
-            PartitionUpdate upd = new PartitionUpdate(h.metadata,
-                                                      h.key,
-                                                      new 
DeletionInfo(h.partitionDeletion),
-                                                      new 
RowDataBlock(h.sHeader.columns().regulars, h.rowEstimate, false, 
h.metadata.isCounter()),
-                                                      h.sHeader.columns(),
-                                                      h.rowEstimate);
+            assert !header.isReversed;
+            assert header.rowEstimate >= 0;
 
-            upd.staticRow = h.staticRow;
+            MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(header.partitionDeletion, 
header.metadata.comparator, false);
+            List<Row> rows = new ArrayList<>(header.rowEstimate);
 
-            RangeTombstoneMarker.Writer markerWriter = upd.markerWriter(false);
-            UnfilteredRowIteratorSerializer.serializer.deserialize(in, new 
SerializationHelper(version, flag), h.sHeader, upd.writer(), markerWriter);
-
-            // Mark sorted after we're read it all since that's what we use in 
the writer() method to detect bad uses
-            upd.isSorted = true;
+            try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag, 
header))
+            {
+                while (partition.hasNext())
+                {
+                    Unfiltered unfiltered = partition.next();
+                    if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                        rows.add((Row)unfiltered);
+                    else
+                        deletionBuilder.add((RangeTombstoneMarker)unfiltered);
+                }
+            }
 
-            return upd;
+            return new PartitionUpdate(header.metadata,
+                                       header.key,
+                                       header.sHeader.columns(),
+                                       header.staticRow,
+                                       rows,
+                                       deletionBuilder.build(),
+                                       header.sHeader.stats(),
+                                       true,
+                                       false);
         }
 
         public long serializedSize(PartitionUpdate update, int version)
@@ -719,7 +770,7 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
 
             try (UnfilteredRowIterator iter = 
update.sliceableUnfilteredIterator())
             {
-                return 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, 
update.rows);
+                return 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, 
update.rows.size());
             }
         }
     }
@@ -729,16 +780,14 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
      * us to update the counter value based on the pre-existing value read 
during the read-before-write that counters
      * do. See {@link CounterMutation} to understand how this is used.
      */
-    public class CounterMark
+    public static class CounterMark
     {
-        private final InternalReusableClustering clustering;
-        private final int row;
+        private final Row row;
         private final ColumnDefinition column;
         private final CellPath path;
 
-        private CounterMark(InternalReusableClustering clustering, int row, 
ColumnDefinition column, CellPath path)
+        private CounterMark(Row row, ColumnDefinition column, CellPath path)
         {
-            this.clustering = clustering;
             this.row = row;
             this.column = column;
             this.path = path;
@@ -746,7 +795,7 @@ public class PartitionUpdate extends AbstractPartitionData 
implements Sorting.So
 
         public Clustering clustering()
         {
-            return clustering.setTo(row);
+            return row.clustering();
         }
 
         public ColumnDefinition column()
@@ -761,12 +810,17 @@ public class PartitionUpdate extends 
AbstractPartitionData implements Sorting.So
 
         public ByteBuffer value()
         {
-            return data.getValue(row, column, path);
+            return path == null
+                 ? row.getCell(column).value()
+                 : row.getCell(column, path).value();
         }
 
         public void setValue(ByteBuffer value)
         {
-            data.setValue(row, column, path, value);
+            // This is a bit of a giant hack as this is the only place where 
we mutate a Row object. This makes it more efficient
+            // for counters however and this won't be needed post-#6506 so 
that's probably fine.
+            assert row instanceof ArrayBackedRow;
+            ((ArrayBackedRow)row).setValue(column, path, value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java 
b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
new file mode 100644
index 0000000..492fe1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+
+public abstract class PurgingPartitionIterator extends 
WrappingUnfilteredPartitionIterator
+{
+    private final DeletionPurger purger;
+    private final int gcBefore;
+
+    private UnfilteredRowIterator next;
+
+    public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int 
gcBefore)
+    {
+        super(iterator);
+        this.gcBefore = gcBefore;
+        this.purger = new DeletionPurger()
+        {
+            public boolean shouldPurge(long timestamp, int localDeletionTime)
+            {
+                return timestamp < getMaxPurgeableTimestamp() && 
localDeletionTime < gcBefore;
+            }
+        };
+    }
+
+    protected abstract long getMaxPurgeableTimestamp();
+
+    // Called at the beginning of each new partition
+    protected void onNewPartition(DecoratedKey partitionKey)
+    {
+    }
+
+    // Called for each partition that had only purged infos and are empty 
post-purge.
+    protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
+    {
+    }
+
+    // Called for every unfiltered. Meant for CompactionIterator to update 
progress
+    protected void updateProgress()
+    {
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+        while (next == null && super.hasNext())
+        {
+            UnfilteredRowIterator iterator = super.next();
+            onNewPartition(iterator.partitionKey());
+
+            UnfilteredRowIterator purged = purge(iterator);
+            if (isForThrift() || !purged.isEmpty())
+            {
+                next = purged;
+                return true;
+            }
+
+            onEmptyPartitionPostPurge(purged.partitionKey());
+        }
+        return next != null;
+    }
+
+    @Override
+    public UnfilteredRowIterator next()
+    {
+        UnfilteredRowIterator toReturn = next;
+        next = null;
+        return toReturn;
+    }
+
+    private UnfilteredRowIterator purge(final UnfilteredRowIterator iter)
+    {
+        return new AlteringUnfilteredRowIterator(iter)
+        {
+            @Override
+            public DeletionTime partitionLevelDeletion()
+            {
+                DeletionTime dt = iter.partitionLevelDeletion();
+                return purger.shouldPurge(dt) ? DeletionTime.LIVE : dt;
+            }
+
+            @Override
+            public Row computeNextStatic(Row row)
+            {
+                return row.purge(purger, gcBefore);
+            }
+
+            @Override
+            public Row computeNext(Row row)
+            {
+                return row.purge(purger, gcBefore);
+            }
+
+            @Override
+            public RangeTombstoneMarker computeNext(RangeTombstoneMarker 
marker)
+            {
+                boolean reversed = isReverseOrder();
+                if (marker.isBoundary())
+                {
+                    // We can only skip the whole marker if both deletion time 
are purgeable.
+                    // If only one of them is, filterTombstoneMarker will deal 
with it.
+                    RangeTombstoneBoundaryMarker boundary = 
(RangeTombstoneBoundaryMarker)marker;
+                    boolean shouldPurgeClose = 
purger.shouldPurge(boundary.closeDeletionTime(reversed));
+                    boolean shouldPurgeOpen = 
purger.shouldPurge(boundary.openDeletionTime(reversed));
+
+                    if (shouldPurgeClose)
+                    {
+                        if (shouldPurgeOpen)
+                            return null;
+
+                        return 
boundary.createCorrespondingOpenMarker(reversed);
+                    }
+
+                    return shouldPurgeOpen
+                         ? boundary.createCorrespondingCloseMarker(reversed)
+                         : marker;
+                }
+                else
+                {
+                    return 
purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : 
marker;
+                }
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                Unfiltered next = super.next();
+                updateProgress();
+                return next;
+            }
+        };
+    }
+};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
deleted file mode 100644
index 10022eb..0000000
--- 
a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-public abstract class TombstonePurgingPartitionIterator extends 
FilteringPartitionIterator
-{
-    private final int gcBefore;
-
-    public TombstonePurgingPartitionIterator(UnfilteredPartitionIterator 
iterator, int gcBefore)
-    {
-        super(iterator);
-        this.gcBefore = gcBefore;
-    }
-
-    protected abstract long getMaxPurgeableTimestamp();
-
-    protected FilteringRow makeRowFilter()
-    {
-        return new FilteringRow()
-        {
-            @Override
-            protected boolean include(LivenessInfo info)
-            {
-                return !info.hasLocalDeletionTime() || 
!info.isPurgeable(getMaxPurgeableTimestamp(), gcBefore);
-            }
-
-            @Override
-            protected boolean include(DeletionTime dt)
-            {
-                return includeDelTime(dt);
-            }
-
-            @Override
-            protected boolean include(ColumnDefinition c, DeletionTime dt)
-            {
-                return includeDelTime(dt);
-            }
-        };
-    }
-
-    private boolean includeDelTime(DeletionTime dt)
-    {
-        return dt.isLive() || !dt.isPurgeable(getMaxPurgeableTimestamp(), 
gcBefore);
-    }
-
-    @Override
-    protected boolean includePartitionDeletion(DeletionTime dt)
-    {
-        return includeDelTime(dt);
-    }
-
-    @Override
-    protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker)
-    {
-        if (marker.isBoundary())
-        {
-            // We can only skip the whole marker if both deletion time are 
purgeable.
-            // If only one of them is, filterTombstoneMarker will deal with it.
-            RangeTombstoneBoundaryMarker boundary = 
(RangeTombstoneBoundaryMarker)marker;
-            return includeDelTime(boundary.endDeletionTime()) || 
includeDelTime(boundary.startDeletionTime());
-        }
-        else
-        {
-            return 
includeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime());
-        }
-    }
-
-    @Override
-    protected RangeTombstoneMarker 
filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed)
-    {
-        if (!marker.isBoundary())
-            return marker;
-
-        // Note that we know this is called after includeRangeTombstoneMarker. 
So if one of the deletion time is
-        // purgeable, we know the other one isn't.
-        RangeTombstoneBoundaryMarker boundary = 
(RangeTombstoneBoundaryMarker)marker;
-        if (!(includeDelTime(boundary.closeDeletionTime(reversed))))
-            return boundary.createCorrespondingCloseBound(reversed);
-        else if (!(includeDelTime(boundary.openDeletionTime(reversed))))
-            return boundary.createCorrespondingOpenBound(reversed);
-        return boundary;
-    }
-
-};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 0d3d364..4414f44 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -19,13 +19,10 @@ package org.apache.cassandra.db.partitions;
 
 import java.io.IOError;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -33,25 +30,14 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.MergeIterator;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Static methods to work with partition iterators.
  */
 public abstract class UnfilteredPartitionIterators
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(UnfilteredPartitionIterators.class);
-
     private static final Serializer serializer = new Serializer();
 
-    private static final Comparator<UnfilteredRowIterator> partitionComparator 
= new Comparator<UnfilteredRowIterator>()
-    {
-        public int compare(UnfilteredRowIterator p1, UnfilteredRowIterator p2)
-        {
-            return p1.partitionKey().compareTo(p2.partitionKey());
-        }
-    };
+    private static final Comparator<UnfilteredRowIterator> partitionComparator 
= (p1, p2) -> p1.partitionKey().compareTo(p2.partitionKey());
 
     public static final UnfilteredPartitionIterator EMPTY = new 
AbstractUnfilteredPartitionIterator()
     {
@@ -242,28 +228,6 @@ public abstract class UnfilteredPartitionIterators
         };
     }
 
-    /**
-     * Convert all expired cells to equivalent tombstones.
-     * <p>
-     * See {@link UnfilteredRowIterators#convertExpiredCellsToTombstones} for 
details.
-     *
-     * @param iterator the iterator in which to conver expired cells.
-     * @param nowInSec the current time to use to decide if a cell is expired.
-     * @return an iterator that returns the same data than {@code iterator} 
but with all expired cells converted
-     * to equivalent tombstones.
-     */
-    public static UnfilteredPartitionIterator 
convertExpiredCellsToTombstones(UnfilteredPartitionIterator iterator, final int 
nowInSec)
-    {
-        return new WrappingUnfilteredPartitionIterator(iterator)
-        {
-            @Override
-            protected UnfilteredRowIterator computeNext(UnfilteredRowIterator 
iter)
-            {
-                return 
UnfilteredRowIterators.convertExpiredCellsToTombstones(iter, nowInSec);
-            }
-        };
-    }
-
     public static UnfilteredPartitionIterator mergeLazily(final List<? extends 
UnfilteredPartitionIterator> iterators, final int nowInSec)
     {
         assert !iterators.isEmpty();
@@ -330,52 +294,6 @@ public abstract class UnfilteredPartitionIterators
         };
     }
 
-    public static UnfilteredPartitionIterator 
removeDroppedColumns(UnfilteredPartitionIterator iterator, final 
Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns)
-    {
-        return new FilteringPartitionIterator(iterator)
-        {
-            @Override
-            protected FilteringRow makeRowFilter()
-            {
-                return new FilteringRow()
-                {
-                    @Override
-                    protected boolean include(Cell cell)
-                    {
-                        return include(cell.column(), 
cell.livenessInfo().timestamp());
-                    }
-
-                    @Override
-                    protected boolean include(ColumnDefinition c, DeletionTime 
dt)
-                    {
-                        return include(c, dt.markedForDeleteAt());
-                    }
-
-                    private boolean include(ColumnDefinition column, long 
timestamp)
-                    {
-                        CFMetaData.DroppedColumn dropped = 
droppedColumns.get(column.name.bytes);
-                        return dropped == null || timestamp > 
dropped.droppedTime;
-                    }
-                };
-            }
-
-            @Override
-            protected boolean shouldFilter(UnfilteredRowIterator iterator)
-            {
-                // TODO: We could have row iterators return the smallest 
timestamp they might return
-                // (which we can get from sstable stats), and ignore any 
dropping if that smallest
-                // timestamp is bigger that the biggest droppedColumns 
timestamp.
-
-                // If none of the dropped columns is part of the columns that 
the iterator actually returns, there is nothing to do;
-                for (ColumnDefinition c : iterator.columns())
-                    if (droppedColumns.containsKey(c.name.bytes))
-                        return true;
-
-                return false;
-            }
-        };
-    }
-
     public static void digest(UnfilteredPartitionIterator iterator, 
MessageDigest digest)
     {
         try (UnfilteredPartitionIterator iter = iterator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java 
b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index c003d6f..807741a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -34,30 +34,12 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public abstract class AbstractCell implements Cell
 {
-    public boolean isLive(int nowInSec)
-    {
-        return livenessInfo().isLive(nowInSec);
-    }
-
-    public boolean isTombstone()
-    {
-        return livenessInfo().hasLocalDeletionTime() && 
!livenessInfo().hasTTL();
-    }
-
-    public boolean isExpiring()
-    {
-        return livenessInfo().hasTTL();
-    }
-
-    public void writeTo(Row.Writer writer)
-    {
-        writer.writeCell(column(), isCounterCell(), value(), livenessInfo(), 
path());
-    }
-
     public void digest(MessageDigest digest)
     {
         digest.update(value().duplicate());
-        livenessInfo().digest(digest);
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithInt(digest, localDeletionTime());
+        FBUtilities.updateWithInt(digest, ttl());
         FBUtilities.updateWithBoolean(digest, isCounterCell());
         if (path() != null)
             path().digest(digest);
@@ -67,7 +49,12 @@ public abstract class AbstractCell implements Cell
     {
         column().validateCellValue(value());
 
-        livenessInfo().validate();
+        if (ttl() < 0)
+            throw new MarshalException("A TTL should not be negative");
+        if (localDeletionTime() < 0)
+            throw new MarshalException("A local deletion time should not be 
negative");
+        if (isExpiring() && localDeletionTime() == NO_DELETION_TIME)
+            throw new MarshalException("Shoud not have a TTL without an 
associated local deletion time");
 
         // If cell is a tombstone, it shouldn't have a value.
         if (isTombstone() && value().hasRemaining())
@@ -77,59 +64,58 @@ public abstract class AbstractCell implements Cell
             column().validateCellPath(path());
     }
 
-    public int dataSize()
-    {
-        int size = value().remaining() + livenessInfo().dataSize();
-        if (path() != null)
-            size += path().dataSize();
-        return size;
-
-    }
-
     @Override
     public boolean equals(Object other)
     {
+        if (this == other)
+            return true;
+
         if(!(other instanceof Cell))
             return false;
 
         Cell that = (Cell)other;
         return this.column().equals(that.column())
             && this.isCounterCell() == that.isCounterCell()
+            && this.timestamp() == that.timestamp()
+            && this.ttl() == that.ttl()
+            && this.localDeletionTime() == that.localDeletionTime()
             && Objects.equals(this.value(), that.value())
-            && Objects.equals(this.livenessInfo(), that.livenessInfo())
             && Objects.equals(this.path(), that.path());
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(column(), isCounterCell(), value(), 
livenessInfo(), path());
+        return Objects.hash(column(), isCounterCell(), timestamp(), ttl(), 
localDeletionTime(), value(), path());
     }
 
     @Override
     public String toString()
     {
         if (isCounterCell())
-            return String.format("[%s=%d ts=%d]", column().name, 
CounterContext.instance().total(value()), livenessInfo().timestamp());
+            return String.format("[%s=%d ts=%d]", column().name, 
CounterContext.instance().total(value()), timestamp());
 
         AbstractType<?> type = column().type;
         if (type instanceof CollectionType && type.isMultiCell())
         {
             CollectionType ct = (CollectionType)type;
-            return String.format("[%s[%s]=%s info=%s]",
+            return String.format("[%s[%s]=%s %s]",
                                  column().name,
                                  ct.nameComparator().getString(path().get(0)),
                                  ct.valueComparator().getString(value()),
-                                 livenessInfo());
+                                 livenessInfoString());
         }
-        return String.format("[%s=%s info=%s]", column().name, 
type.getString(value()), livenessInfo());
+        return String.format("[%s=%s %s]", column().name, 
type.getString(value()), livenessInfoString());
     }
 
-    public Cell takeAlias()
+    private String livenessInfoString()
     {
-        // Cell is always used as an Aliasable object but as the code currently
-        // never need to alias a cell outside of its valid scope, we don't yet
-        // need that.
-        throw new UnsupportedOperationException();
+        if (isExpiring())
+            return String.format("ts=%d ttl=%d ldt=%d", timestamp(), ttl(), 
localDeletionTime());
+        else if (isTombstone())
+            return String.format("ts=%d ldt=%d", timestamp(), 
localDeletionTime());
+        else
+            return String.format("ts=%d", timestamp());
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
index d8256fc..e90e52b 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java
@@ -41,6 +41,21 @@ public abstract class AbstractRangeTombstoneMarker 
implements RangeTombstoneMark
         return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER;
     }
 
+    public boolean isBoundary()
+    {
+        return bound.isBoundary();
+    }
+
+    public boolean isOpen(boolean reversed)
+    {
+        return bound.isOpen(reversed);
+    }
+
+    public boolean isClose(boolean reversed)
+    {
+        return bound.isClose(reversed);
+    }
+
     public void validateData(CFMetaData metadata)
     {
         Slice.Bound bound = clustering();
@@ -56,16 +71,4 @@ public abstract class AbstractRangeTombstoneMarker 
implements RangeTombstoneMark
     {
         return toString(metadata);
     }
-
-    protected void copyBoundTo(RangeTombstoneMarker.Writer writer)
-    {
-        for (int i = 0; i < bound.size(); i++)
-            writer.writeClusteringValue(bound.get(i));
-        writer.writeBoundKind(bound.kind());
-    }
-
-    public Unfiltered takeAlias()
-    {
-        return this;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java 
b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
deleted file mode 100644
index 03aeb88..0000000
--- a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.Iterator;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.SearchIterator;
-
-public abstract class AbstractReusableRow extends AbstractRow
-{
-    private CellData.ReusableCell simpleCell;
-    private ComplexRowDataBlock.ReusableIterator complexCells;
-    private DeletionTimeArray.Cursor complexDeletionCursor;
-    private RowDataBlock.ReusableIterator iterator;
-
-    public AbstractReusableRow()
-    {
-    }
-
-    protected abstract int row();
-    protected abstract RowDataBlock data();
-
-    private CellData.ReusableCell simpleCell()
-    {
-        if (simpleCell == null)
-            simpleCell = SimpleRowDataBlock.reusableCell();
-        return simpleCell;
-    }
-
-    private ComplexRowDataBlock.ReusableIterator complexCells()
-    {
-        if (complexCells == null)
-            complexCells = ComplexRowDataBlock.reusableComplexCells();
-        return complexCells;
-    }
-
-    private DeletionTimeArray.Cursor complexDeletionCursor()
-    {
-        if (complexDeletionCursor == null)
-            complexDeletionCursor = 
ComplexRowDataBlock.complexDeletionCursor();
-        return complexDeletionCursor;
-    }
-
-    private RowDataBlock.ReusableIterator reusableIterator()
-    {
-        if (iterator == null)
-            iterator = RowDataBlock.reusableIterator();
-        return iterator;
-    }
-
-    public Columns columns()
-    {
-        return data().columns();
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        assert !c.isComplex();
-        if (data().simpleData == null)
-            return null;
-
-        int idx = columns().simpleIdx(c, 0);
-        if (idx < 0)
-            return null;
-
-        return simpleCell().setTo(data().simpleData.data, c, (row() * 
columns().simpleColumnCount()) + idx);
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        assert c.isComplex();
-
-        ComplexRowDataBlock data = data().complexData;
-        if (data == null)
-            return null;
-
-        int idx = data.cellIdx(row(), c, path);
-        if (idx < 0)
-            return null;
-
-        return simpleCell().setTo(data.cellData(row()), c, idx);
-    }
-
-    public Iterator<Cell> getCells(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        return complexCells().setTo(data().complexData, row(), c);
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        return data().hasComplexDeletion(row());
-    }
-
-    public DeletionTime getDeletion(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        if (data().complexData == null)
-            return DeletionTime.LIVE;
-
-        int idx = data().complexData.complexDeletionIdx(row(), c);
-        return idx < 0
-             ? DeletionTime.LIVE
-             : 
complexDeletionCursor().setTo(data().complexData.complexDelTimes, idx);
-    }
-
-    public Iterator<Cell> iterator()
-    {
-        return reusableIterator().setTo(data(), row());
-    }
-
-    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return new SearchIterator<ColumnDefinition, ColumnData>()
-        {
-            private int simpleIdx = 0;
-
-            public boolean hasNext()
-            {
-                // TODO: we can do better, but we expect users to no rely on 
this anyway
-                return true;
-            }
-
-            public ColumnData next(ColumnDefinition column)
-            {
-                if (column.isComplex())
-                {
-                    // TODO: this is sub-optimal
-
-                    Iterator<Cell> cells = getCells(column);
-                    return cells == null ? null : new ColumnData(column, null, 
cells, getDeletion(column));
-                }
-                else
-                {
-                    int idx = columns().simpleIdx(column, simpleIdx);
-                    if (idx < 0)
-                        return null;
-
-                    Cell cell = simpleCell().setTo(data().simpleData.data, 
column, (row() * columns().simpleColumnCount()) + idx);
-                    simpleIdx = idx + 1;
-                    return cell == null ? null : new ColumnData(column, cell, 
null, null);
-                }
-            }
-        };
-    }
-
-    public Row takeAlias()
-    {
-        final Clustering clustering = clustering().takeAlias();
-        final LivenessInfo info = primaryKeyLivenessInfo().takeAlias();
-        final DeletionTime deletion = deletion().takeAlias();
-
-        final RowDataBlock data = data();
-        final int row = row();
-
-        return new AbstractReusableRow()
-        {
-            protected RowDataBlock data()
-            {
-                return data;
-            }
-
-            protected int row()
-            {
-                return row;
-            }
-
-            public Clustering clustering()
-            {
-                return clustering;
-            }
-
-            public LivenessInfo primaryKeyLivenessInfo()
-            {
-                return info;
-            }
-
-            public DeletionTime deletion()
-            {
-                return deletion;
-            }
-
-            @Override
-            public Row takeAlias()
-            {
-                return this;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index a99bc78..807d805 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -18,11 +18,11 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
-import java.util.Iterator;
 import java.util.Objects;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.serializers.MarshalException;
@@ -46,19 +46,14 @@ public abstract class AbstractRow implements Row
         if (primaryKeyLivenessInfo().isLive(nowInSec))
             return true;
 
-        for (Cell cell : this)
-            if (cell.isLive(nowInSec))
-                return true;
-
-        return false;
+        return Iterables.any(cells(), cell -> cell.isLive(nowInSec));
     }
 
     public boolean isEmpty()
     {
-        return !primaryKeyLivenessInfo().hasTimestamp()
+        return primaryKeyLivenessInfo().isEmpty()
             && deletion().isLive()
-            && !iterator().hasNext()
-            && !hasComplexDeletion();
+            && !iterator().hasNext();
     }
 
     public boolean isStatic()
@@ -74,36 +69,8 @@ public abstract class AbstractRow implements Row
         deletion().digest(digest);
         primaryKeyLivenessInfo().digest(digest);
 
-        Iterator<ColumnDefinition> iter = columns().complexColumns();
-        while (iter.hasNext())
-            getDeletion(iter.next()).digest(digest);
-
-        for (Cell cell : this)
-            cell.digest(digest);
-    }
-
-    /**
-     * Copy this row to the provided writer.
-     *
-     * @param writer the row writer to write this row to.
-     */
-    public void copyTo(Row.Writer writer)
-    {
-        Rows.writeClustering(clustering(), writer);
-        writer.writePartitionKeyLivenessInfo(primaryKeyLivenessInfo());
-        writer.writeRowDeletion(deletion());
-
-        for (Cell cell : this)
-            cell.writeTo(writer);
-
-        for (int i = 0; i < columns().complexColumnCount(); i++)
-        {
-            ColumnDefinition c = columns().getComplex(i);
-            DeletionTime dt = getDeletion(c);
-            if (!dt.isLive())
-                writer.writeComplexDeletion(c, dt);
-        }
-        writer.endOfRow();
+        for (ColumnData cd : this)
+            cd.digest(digest);
     }
 
     public void validateData(CFMetaData metadata)
@@ -120,8 +87,8 @@ public abstract class AbstractRow implements Row
         if (deletion().localDeletionTime() < 0)
             throw new MarshalException("A local deletion time should not be 
negative");
 
-        for (Cell cell : this)
-            cell.validate();
+        for (ColumnData cd : this)
+            cd.validate();
     }
 
     public String toString(CFMetaData metadata)
@@ -142,33 +109,43 @@ public abstract class AbstractRow implements Row
         }
         sb.append(": ").append(clustering().toString(metadata)).append(" | ");
         boolean isFirst = true;
-        ColumnDefinition prevColumn = null;
-        for (Cell cell : this)
+        for (ColumnData cd : this)
         {
             if (isFirst) isFirst = false; else sb.append(", ");
             if (fullDetails)
             {
-                if (cell.column().isComplex() && 
!cell.column().equals(prevColumn))
+                if (cd.column().isSimple())
                 {
-                    DeletionTime complexDel = getDeletion(cell.column());
-                    if (!complexDel.isLive())
-                        
sb.append("del(").append(cell.column().name).append(")=").append(complexDel).append(",
 ");
+                    sb.append(cd);
+                }
+                else
+                {
+                    ComplexColumnData complexData = (ComplexColumnData)cd;
+                    if (!complexData.complexDeletion().isLive())
+                        
sb.append("del(").append(cd.column().name).append(")=").append(complexData.complexDeletion());
+                    for (Cell cell : complexData)
+                        sb.append(", ").append(cell);
                 }
-                sb.append(cell);
-                prevColumn = cell.column();
             }
             else
             {
-                sb.append(cell.column().name);
-                if (cell.column().type instanceof CollectionType)
+                if (cd.column().isSimple())
                 {
-                    CollectionType ct = (CollectionType)cell.column().type;
-                    
sb.append("[").append(ct.nameComparator().getString(cell.path().get(0))).append("]");
-                    
sb.append("=").append(ct.valueComparator().getString(cell.value()));
+                    Cell cell = (Cell)cd;
+                    
sb.append(cell.column().name).append('=').append(cell.column().type.getString(cell.value()));
                 }
                 else
                 {
-                    
sb.append("=").append(cell.column().type.getString(cell.value()));
+                    ComplexColumnData complexData = (ComplexColumnData)cd;
+                    CollectionType ct = (CollectionType)cd.column().type;
+                    sb.append(cd.column().name).append("={");
+                    int i = 0;
+                    for (Cell cell : complexData)
+                    {
+                        sb.append(i++ == 0 ? "" : ", ");
+                        
sb.append(ct.nameComparator().getString(cell.path().get(0))).append("->").append(ct.valueComparator().getString(cell.value()));
+                    }
+                    sb.append('}');
                 }
             }
         }
@@ -188,22 +165,15 @@ public abstract class AbstractRow implements Row
              || !this.deletion().equals(that.deletion()))
             return false;
 
-        Iterator<Cell> thisCells = this.iterator();
-        Iterator<Cell> thatCells = that.iterator();
-        while (thisCells.hasNext())
-        {
-            if (!thatCells.hasNext() || 
!thisCells.next().equals(thatCells.next()))
-                return false;
-        }
-        return !thatCells.hasNext();
+        return Iterables.elementsEqual(this, that);
     }
 
     @Override
     public int hashCode()
     {
         int hash = Objects.hash(clustering(), columns(), 
primaryKeyLivenessInfo(), deletion());
-        for (Cell cell : this)
-            hash += 31 * cell.hashCode();
+        for (ColumnData cd : this)
+            hash += 31 * cd.hashCode();
         return hash;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
index 5bfd1a3..b4f849a 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java
@@ -90,18 +90,4 @@ public abstract class AbstractUnfilteredRowIterator extends 
AbstractIterator<Unf
     public void close()
     {
     }
-
-    public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator 
b)
-    {
-        return Objects.equals(a.columns(), b.columns())
-            && Objects.equals(a.metadata(), b.metadata())
-            && Objects.equals(a.isReverseOrder(), b.isReverseOrder())
-            && Objects.equals(a.partitionKey(), b.partitionKey())
-            && Objects.equals(a.partitionLevelDeletion(), 
b.partitionLevelDeletion())
-            && Objects.equals(a.staticRow(), b.staticRow())
-            && Objects.equals(a.stats(), b.stats())
-            && Objects.equals(a.metadata(), b.metadata())
-            && Iterators.elementsEqual(a, b);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
new file mode 100644
index 0000000..a390bad
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.util.NoSuchElementException;
+
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+
+/**
+ * Class that makes it easier to write unfiltered iterators that filter or 
modify
+ * the returned unfiltered.
+ *
+ * The methods you want to override are {@code computeNextStatic} and the 
{@code computeNext} methods.
+ * All of these methods are allowed to return a {@code null} value with the 
meaning of ignoring
+ * the entry.
+ */
+public abstract class AlteringUnfilteredRowIterator extends 
WrappingUnfilteredRowIterator
+{
+    private Row staticRow;
+    private Unfiltered next;
+
+    protected AlteringUnfilteredRowIterator(UnfilteredRowIterator wrapped)
+    {
+        super(wrapped);
+    }
+
+    protected Row computeNextStatic(Row row)
+    {
+        return row;
+    }
+
+    protected Row computeNext(Row row)
+    {
+        return row;
+    }
+
+    protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+    {
+        return marker;
+    }
+
+    public Row staticRow()
+    {
+        if (staticRow == null)
+        {
+            Row row = computeNextStatic(super.staticRow());
+            staticRow = row == null ? Rows.EMPTY_STATIC_ROW : row;
+        }
+        return staticRow;
+    }
+
+    public boolean hasNext()
+    {
+        while (next == null && super.hasNext())
+        {
+            Unfiltered unfiltered = super.next();
+            if (unfiltered.isRow())
+            {
+                Row row = computeNext((Row)unfiltered);
+                if (row != null && !row.isEmpty())
+                    next = row;
+            }
+            else
+            {
+                next = computeNext((RangeTombstoneMarker)unfiltered);
+            }
+        }
+        return next != null;
+    }
+
+    public Unfiltered next()
+    {
+        if (!hasNext())
+            throw new NoSuchElementException();
+
+        Unfiltered toReturn = next;
+        next = null;
+        return toReturn;
+    }
+}

Reply via email to