http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java 
b/src/java/org/apache/cassandra/db/LegacyLayout.java
new file mode 100644
index 0000000..c1a7fd0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -0,0 +1,1301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.IOError;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.utils.*;
+import org.apache.hadoop.io.serializer.Serialization;
+
+/**
+ * Functions to deal with the old format.
+ */
+public abstract class LegacyLayout
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LegacyLayout.class);
+
+    public final static int MAX_CELL_NAME_LENGTH = 
FBUtilities.MAX_UNSIGNED_SHORT;
+
+    private final static int DELETION_MASK        = 0x01;
+    private final static int EXPIRATION_MASK      = 0x02;
+    private final static int COUNTER_MASK         = 0x04;
+    private final static int COUNTER_UPDATE_MASK  = 0x08;
+    private final static int RANGE_TOMBSTONE_MASK = 0x10;
+
+    private LegacyLayout() {}
+
+    public static AbstractType<?> makeLegacyComparator(CFMetaData metadata)
+    {
+        ClusteringComparator comparator = metadata.comparator;
+        if (!metadata.isCompound())
+        {
+            assert comparator.size() == 1;
+            return comparator.subtype(0);
+        }
+
+        boolean hasCollections = metadata.hasCollectionColumns();
+        List<AbstractType<?>> types = new ArrayList<>(comparator.size() + 
(metadata.isDense() ? 0 : 1) + (hasCollections ? 1 : 0));
+
+        types.addAll(comparator.subtypes());
+
+        if (!metadata.isDense())
+        {
+            types.add(UTF8Type.instance);
+            if (hasCollections)
+            {
+                Map<ByteBuffer, CollectionType> defined = new HashMap<>();
+                for (ColumnDefinition def : metadata.partitionColumns())
+                {
+                    if (def.type instanceof CollectionType && 
def.type.isMultiCell())
+                        defined.put(def.name.bytes, (CollectionType)def.type);
+                }
+                types.add(ColumnToCollectionType.getInstance(defined));
+            }
+        }
+        return CompositeType.getInstance(types);
+    }
+
+    public static LegacyCellName decodeCellName(CFMetaData metadata, 
ByteBuffer superColumnName, ByteBuffer cellname)
+    throws UnknownColumnException
+    {
+        assert cellname != null;
+        if (metadata.isSuper())
+        {
+            assert superColumnName != null;
+            return decodeForSuperColumn(metadata, new 
SimpleClustering(superColumnName), cellname);
+        }
+
+        assert superColumnName == null;
+        return decodeCellName(metadata, cellname);
+    }
+
+    private static LegacyCellName decodeForSuperColumn(CFMetaData metadata, 
Clustering clustering, ByteBuffer subcol)
+    {
+        ColumnDefinition def = metadata.getColumnDefinition(subcol);
+        if (def != null)
+        {
+            // it's a statically defined subcolumn
+            return new LegacyCellName(clustering, def, null);
+        }
+
+        def = metadata.compactValueColumn();
+        assert def != null && def.type instanceof MapType;
+        return new LegacyCellName(clustering, def, subcol);
+    }
+
+    public static LegacyCellName decodeCellName(CFMetaData metadata, 
ByteBuffer cellname) throws UnknownColumnException
+    {
+        return decodeCellName(metadata, cellname, false);
+    }
+
+    public static LegacyCellName decodeCellName(CFMetaData metadata, 
ByteBuffer cellname, boolean readAllAsDynamic) throws UnknownColumnException
+    {
+        Clustering clustering = decodeClustering(metadata, cellname);
+
+        if (metadata.isSuper())
+            return decodeForSuperColumn(metadata, clustering, 
CompositeType.extractComponent(cellname, 1));
+
+        if (metadata.isDense() || (metadata.isCompactTable() && 
readAllAsDynamic))
+            return new LegacyCellName(clustering, 
metadata.compactValueColumn(), null);
+
+        ByteBuffer column = metadata.isCompound() ? 
CompositeType.extractComponent(cellname, metadata.comparator.size()) : cellname;
+        if (column == null)
+            throw new IllegalArgumentException("No column name component found 
in cell name");
+
+        // Row marker, this is ok
+        if (!column.hasRemaining())
+            return new LegacyCellName(clustering, null, null);
+
+        ColumnDefinition def = metadata.getColumnDefinition(column);
+        if (def == null)
+        {
+            // If it's a compact table, it means the column is in fact a 
"dynamic" one
+            if (metadata.isCompactTable())
+                return new LegacyCellName(new SimpleClustering(column), 
metadata.compactValueColumn(), null);
+
+            throw new UnknownColumnException(metadata, column);
+        }
+
+        ByteBuffer collectionElement = metadata.isCompound() ? 
CompositeType.extractComponent(cellname, metadata.comparator.size() + 1) : null;
+
+        // Note that because static compact columns are translated to static 
defs in the new world order, we need to force a static
+        // clustering if the definition is static (as it might not be in this 
case).
+        return new LegacyCellName(def.isStatic() ? 
Clustering.STATIC_CLUSTERING : clustering, def, collectionElement);
+    }
+
+    public static LegacyBound decodeBound(CFMetaData metadata, ByteBuffer 
bound, boolean isStart)
+    {
+        if (!bound.hasRemaining())
+            return isStart ? LegacyBound.BOTTOM : LegacyBound.TOP;
+
+        List<ByteBuffer> components = metadata.isCompound()
+                                    ? CompositeType.splitName(bound)
+                                    : Collections.singletonList(bound);
+
+        // Either it's a prefix of the clustering, or it's the bound of a 
collection range tombstone (and thus has
+        // the collection column name)
+        assert components.size() <= metadata.comparator.size() || 
(!metadata.isCompactTable() && components.size() == metadata.comparator.size() 
+ 1);
+
+        List<ByteBuffer> prefix = components.size() <= 
metadata.comparator.size() ? components : components.subList(0, 
metadata.comparator.size());
+        Slice.Bound sb = Slice.Bound.create(isStart ? 
Slice.Bound.Kind.INCL_START_BOUND : Slice.Bound.Kind.INCL_END_BOUND,
+                                            prefix.toArray(new 
ByteBuffer[prefix.size()]));
+
+        ColumnDefinition collectionName = components.size() == 
metadata.comparator.size() + 1
+                                        ? 
metadata.getColumnDefinition(components.get(metadata.comparator.size()))
+                                        : null;
+        return new LegacyBound(sb, metadata.isCompound() && 
CompositeType.isStaticName(bound), collectionName);
+    }
+
+    public static ByteBuffer encodeCellName(CFMetaData metadata, Clustering 
clustering, ByteBuffer columnName, ByteBuffer collectionElement)
+    {
+        boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
+
+        if (!metadata.isCompound())
+        {
+            if (isStatic)
+                return columnName;
+
+            assert clustering.size() == 1;
+            return clustering.get(0);
+        }
+
+        // We use comparator.size() rather than clustering.size() because of 
static clusterings
+        int clusteringSize = metadata.comparator.size();
+        int size = clusteringSize + (metadata.isDense() ? 0 : 1) + 
(collectionElement == null ? 0 : 1);
+        ByteBuffer[] values = new ByteBuffer[size];
+        for (int i = 0; i < clusteringSize; i++)
+        {
+            if (isStatic)
+            {
+                values[i] = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+                continue;
+            }
+
+            ByteBuffer v = clustering.get(i);
+            // we can have null (only for dense compound tables for backward 
compatibility reasons) but that
+            // means we're done and should stop there as far as building the 
composite is concerned.
+            if (v == null)
+                return CompositeType.build(Arrays.copyOfRange(values, 0, i));
+
+            values[i] = v;
+        }
+
+        if (!metadata.isDense())
+            values[clusteringSize] = columnName;
+        if (collectionElement != null)
+            values[clusteringSize + 1] = collectionElement;
+
+        return CompositeType.build(isStatic, values);
+    }
+
+    public static Clustering decodeClustering(CFMetaData metadata, ByteBuffer 
value)
+    {
+        int csize = metadata.comparator.size();
+        if (csize == 0)
+            return Clustering.EMPTY;
+
+        if (metadata.isCompound() && CompositeType.isStaticName(value))
+            return Clustering.STATIC_CLUSTERING;
+
+        List<ByteBuffer> components = metadata.isCompound()
+                                    ? CompositeType.splitName(value)
+                                    : Collections.singletonList(value);
+
+        return new SimpleClustering(components.subList(0, Math.min(csize, 
components.size())).toArray(new ByteBuffer[csize]));
+    }
+
+    public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering 
clustering)
+    {
+        if (!metadata.isCompound())
+        {
+            assert clustering.size() == 1;
+            return clustering.get(0);
+        }
+
+        ByteBuffer[] values = new ByteBuffer[clustering.size()];
+        for (int i = 0; i < clustering.size(); i++)
+            values[i] = clustering.get(i);
+        return CompositeType.build(values);
+    }
+
+    // For serializing to old wire format
+    public static Pair<DeletionInfo, Iterator<LegacyCell>> 
fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+    {
+        // we need to extract the range tombstone so materialize the 
partition. Since this is
+        // used for the on-wire format, this is not worst than it used to be.
+        final ArrayBackedPartition partition = 
ArrayBackedPartition.create(iterator);
+        DeletionInfo info = partition.deletionInfo();
+        Iterator<LegacyCell> cells = fromRowIterator(partition.metadata(), 
partition.iterator(), partition.staticRow());
+        return Pair.create(info, cells);
+    }
+
+    // For thrift sake
+    public static UnfilteredRowIterator toUnfilteredRowIterator(CFMetaData 
metadata,
+                                                                DecoratedKey 
key,
+                                                                DeletionInfo 
delInfo,
+                                                                
Iterator<LegacyCell> cells)
+    {
+        SerializationHelper helper = new SerializationHelper(0, 
SerializationHelper.Flag.LOCAL);
+        return toUnfilteredRowIterator(metadata, key, 
LegacyDeletionInfo.from(delInfo), cells, false, helper);
+    }
+
+    // For deserializing old wire format
+    public static UnfilteredRowIterator 
onWireCellstoUnfilteredRowIterator(CFMetaData metadata,
+                                                                           
DecoratedKey key,
+                                                                           
LegacyDeletionInfo delInfo,
+                                                                           
Iterator<LegacyCell> cells,
+                                                                           
boolean reversed,
+                                                                           
SerializationHelper helper)
+    {
+        // If the table is a static compact, the "column_metadata" are now 
internally encoded as
+        // static. This has already been recognized by decodeCellName, but it 
means the cells
+        // provided are not in the expected order (the "static" cells are not 
necessarily at the front).
+        // So sort them to make sure toUnfilteredRowIterator works as expected.
+        // Further, if the query is reversed, then the on-wire format still 
has cells in non-reversed
+        // order, but we need to have them reverse in the final 
UnfilteredRowIterator. So reverse them.
+        if (metadata.isStaticCompactTable() || reversed)
+        {
+            List<LegacyCell> l = new ArrayList<>();
+            Iterators.addAll(l, cells);
+            Collections.sort(l, legacyCellComparator(metadata, reversed));
+            cells = l.iterator();
+        }
+
+        return toUnfilteredRowIterator(metadata, key, delInfo, cells, 
reversed, helper);
+    }
+
+    private static UnfilteredRowIterator toUnfilteredRowIterator(CFMetaData 
metadata,
+                                                                 DecoratedKey 
key,
+                                                                 
LegacyDeletionInfo delInfo,
+                                                                 
Iterator<LegacyCell> cells,
+                                                                 boolean 
reversed,
+                                                                 
SerializationHelper helper)
+    {
+        // Check if we have some static
+        PeekingIterator<LegacyCell> iter = Iterators.peekingIterator(cells);
+        Row staticRow = iter.hasNext() && iter.peek().name.clustering == 
Clustering.STATIC_CLUSTERING
+                      ? getNextRow(CellGrouper.staticGrouper(metadata, 
helper), iter)
+                      : Rows.EMPTY_STATIC_ROW;
+
+        Iterator<Row> rows = convertToRows(new CellGrouper(metadata, helper), 
iter, delInfo);
+        Iterator<RangeTombstone> ranges = 
delInfo.deletionInfo.rangeIterator(reversed);
+        final Iterator<Unfiltered> atoms = new 
RowAndTombstoneMergeIterator(metadata.comparator, reversed)
+                                     .setTo(rows, ranges);
+
+        return new AbstractUnfilteredRowIterator(metadata,
+                                        key,
+                                        
delInfo.deletionInfo.getPartitionDeletion(),
+                                        metadata.partitionColumns(),
+                                        staticRow,
+                                        reversed,
+                                        RowStats.NO_STATS)
+        {
+            protected Unfiltered computeNext()
+            {
+                return atoms.hasNext() ? atoms.next() : endOfData();
+            }
+        };
+    }
+
+    public static Row extractStaticColumns(CFMetaData metadata, DataInput in, 
Columns statics) throws IOException
+    {
+        assert !statics.isEmpty();
+        assert metadata.isCompactTable();
+
+        if (metadata.isSuper())
+            // TODO: there is in practice nothing to do here, but we need to 
handle the column_metadata for super columns somewhere else
+            throw new UnsupportedOperationException();
+
+        Set<ByteBuffer> columnsToFetch = new HashSet<>(statics.columnCount());
+        for (ColumnDefinition column : statics)
+            columnsToFetch.add(column.name.bytes);
+
+        StaticRow.Builder builder = StaticRow.builder(statics, false, 
metadata.isCounter());
+
+        boolean foundOne = false;
+        LegacyAtom atom;
+        while ((atom = readLegacyAtom(metadata, in, false)) != null)
+        {
+            if (atom.isCell())
+            {
+                LegacyCell cell = atom.asCell();
+                if (!columnsToFetch.contains(cell.name.encode(metadata)))
+                    continue;
+
+                foundOne = true;
+                builder.writeCell(cell.name.column, cell.isCounter(), 
cell.value, livenessInfo(metadata, cell), null);
+            }
+            else
+            {
+                LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+                // TODO: we need to track tombstones and potentially ignore 
cells that are
+                // shadowed (or even better, replace them by tombstones).
+                throw new UnsupportedOperationException();
+            }
+        }
+
+        return foundOne ? builder.build() : Rows.EMPTY_STATIC_ROW;
+    }
+
+    private static Row getNextRow(CellGrouper grouper, PeekingIterator<? 
extends LegacyAtom> cells)
+    {
+        if (!cells.hasNext())
+            return null;
+
+        grouper.reset();
+        while (cells.hasNext() && grouper.addAtom(cells.peek()))
+        {
+            // We've added the cell already in the grouper, so just skip it
+            cells.next();
+        }
+        return grouper.getRow();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Iterator<LegacyAtom> asLegacyAtomIterator(Iterator<? 
extends LegacyAtom> iter)
+    {
+        return (Iterator<LegacyAtom>)iter;
+    }
+
+    private static Iterator<Row> convertToRows(final CellGrouper grouper, 
final Iterator<LegacyCell> cells, final LegacyDeletionInfo delInfo)
+    {
+        // A reducer that basically does nothing, we know the 2 merge 
iterators can't have conflicting atoms.
+        MergeIterator.Reducer<LegacyAtom, LegacyAtom> reducer = new 
MergeIterator.Reducer<LegacyAtom, LegacyAtom>()
+        {
+            private LegacyAtom atom;
+
+            public void reduce(int idx, LegacyAtom current)
+            {
+                // We're merging cell with range tombstones, so we should 
always only have a single atom to reduce.
+                assert atom == null;
+                atom = current;
+            }
+
+            protected LegacyAtom getReduced()
+            {
+                return atom;
+            }
+
+            protected void onKeyChange()
+            {
+                atom = null;
+            }
+        };
+        List<Iterator<LegacyAtom>> iterators = 
Arrays.asList(asLegacyAtomIterator(cells), 
asLegacyAtomIterator(delInfo.inRowRangeTombstones()));
+        Iterator<LegacyAtom> merged = MergeIterator.get(iterators, 
grouper.metadata.comparator, reducer);
+        final PeekingIterator<LegacyAtom> atoms = 
Iterators.peekingIterator(merged);
+
+        return new AbstractIterator<Row>()
+        {
+            protected Row computeNext()
+            {
+                if (!atoms.hasNext())
+                    return endOfData();
+
+                return getNextRow(grouper, atoms);
+            }
+        };
+    }
+
+    public static Iterator<LegacyCell> fromRowIterator(final RowIterator 
iterator)
+    {
+        return fromRowIterator(iterator.metadata(), iterator, 
iterator.staticRow());
+    }
+
+    public static Iterator<LegacyCell> fromRowIterator(final CFMetaData 
metadata, final Iterator<Row> iterator, final Row staticRow)
+    {
+        return new AbstractIterator<LegacyCell>()
+        {
+            private Iterator<LegacyCell> currentRow = staticRow.isEmpty()
+                                                    ? 
Collections.<LegacyLayout.LegacyCell>emptyIterator()
+                                                    : fromRow(metadata, 
staticRow);
+
+            protected LegacyCell computeNext()
+            {
+                if (currentRow.hasNext())
+                    return currentRow.next();
+
+                if (!iterator.hasNext())
+                    return endOfData();
+
+                currentRow = fromRow(metadata, iterator.next());
+                return computeNext();
+            }
+        };
+    }
+
+    private static Iterator<LegacyCell> fromRow(final CFMetaData metadata, 
final Row row)
+    {
+        return new AbstractIterator<LegacyCell>()
+        {
+            private final Iterator<Cell> cells = row.iterator();
+            // we don't have (and shouldn't have) row markers for compact 
tables.
+            private boolean hasReturnedRowMarker = metadata.isCompactTable();
+
+            protected LegacyCell computeNext()
+            {
+                if (!hasReturnedRowMarker)
+                {
+                    hasReturnedRowMarker = true;
+                    LegacyCellName cellName = new 
LegacyCellName(row.clustering(), null, null);
+                    LivenessInfo info = row.primaryKeyLivenessInfo();
+                    return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localDeletionTime(), 
info.ttl());
+                }
+
+                if (!cells.hasNext())
+                    return endOfData();
+
+                Cell cell = cells.next();
+                return makeLegacyCell(row.clustering(), cell);
+            }
+        };
+    }
+
+    private static LegacyCell makeLegacyCell(Clustering clustering, Cell cell)
+    {
+        LegacyCell.Kind kind;
+        if (cell.isCounterCell())
+            kind = LegacyCell.Kind.COUNTER;
+        else if (cell.isTombstone())
+            kind = LegacyCell.Kind.DELETED;
+        else if (cell.isExpiring())
+            kind = LegacyCell.Kind.EXPIRING;
+        else
+            kind = LegacyCell.Kind.REGULAR;
+
+        CellPath path = cell.path();
+        assert path == null || path.size() == 1;
+        LegacyCellName name = new LegacyCellName(clustering, cell.column(), 
path == null ? null : path.get(0));
+        LivenessInfo info = cell.livenessInfo();
+        return new LegacyCell(kind, name, cell.value(), info.timestamp(), 
info.localDeletionTime(), info.ttl());
+    }
+
+    public static RowIterator toRowIterator(final CFMetaData metadata,
+                                            final DecoratedKey key,
+                                            final Iterator<LegacyCell> cells,
+                                            final int nowInSec)
+    {
+        SerializationHelper helper = new SerializationHelper(0, 
SerializationHelper.Flag.LOCAL);
+        return UnfilteredRowIterators.filter(toUnfilteredRowIterator(metadata, 
key, LegacyDeletionInfo.live(), cells, false, helper), nowInSec);
+    }
+
+    private static LivenessInfo livenessInfo(CFMetaData metadata, LegacyCell 
cell)
+    {
+        return cell.isTombstone()
+             ? SimpleLivenessInfo.forDeletion(cell.timestamp, 
cell.localDeletionTime)
+             : SimpleLivenessInfo.forUpdate(cell.timestamp, cell.ttl, 
cell.localDeletionTime, metadata);
+    }
+
+    public static Comparator<LegacyCell> legacyCellComparator(CFMetaData 
metadata)
+    {
+        return legacyCellComparator(metadata, false);
+    }
+
+    public static Comparator<LegacyCell> legacyCellComparator(final CFMetaData 
metadata, final boolean reversed)
+    {
+        final Comparator<LegacyCellName> cellNameComparator = 
legacyCellNameComparator(metadata, reversed);
+        return new Comparator<LegacyCell>()
+        {
+            public int compare(LegacyCell cell1, LegacyCell cell2)
+            {
+                LegacyCellName c1 = cell1.name;
+                LegacyCellName c2 = cell2.name;
+
+                int c = cellNameComparator.compare(c1, c2);
+                if (c != 0)
+                    return c;
+
+                // The actual sorting when the cellname is equal doesn't 
matter, we just want to make
+                // sure the cells are not considered equal.
+                if (cell1.timestamp != cell2.timestamp)
+                    return cell1.timestamp < cell2.timestamp ? -1 : 1;
+
+                if (cell1.localDeletionTime != cell2.localDeletionTime)
+                    return cell1.localDeletionTime < cell2.localDeletionTime ? 
-1 : 1;
+
+                return cell1.value.compareTo(cell2.value);
+            }
+        };
+    }
+
+    public static Comparator<LegacyCellName> legacyCellNameComparator(final 
CFMetaData metadata, final boolean reversed)
+    {
+        return new Comparator<LegacyCellName>()
+        {
+            public int compare(LegacyCellName c1, LegacyCellName c2)
+            {
+                // Compare clustering first
+                if (c1.clustering == Clustering.STATIC_CLUSTERING)
+                {
+                    if (c2.clustering != Clustering.STATIC_CLUSTERING)
+                        return -1;
+                }
+                else if (c2.clustering == Clustering.STATIC_CLUSTERING)
+                {
+                    return 1;
+                }
+                else
+                {
+                    int c = metadata.comparator.compare(c1.clustering, 
c2.clustering);
+                    if (c != 0)
+                        return reversed ? -c : c;
+                }
+
+                // Note that when reversed, we only care about the clustering 
being reversed, so it's ok
+                // not to take reversed into account below.
+
+                // Then check the column name
+                if (c1.column != c2.column)
+                {
+                    // A null for the column means it's a row marker
+                    if (c1.column == null)
+                        return -1;
+                    if (c2.column == null)
+                        return 1;
+
+                    assert c1.column.isRegular() || c1.column.isStatic();
+                    assert c2.column.isRegular() || c2.column.isStatic();
+                    if (c1.column.kind != c2.column.kind)
+                        return c1.column.isStatic() ? -1 : 1;
+
+                    AbstractType<?> cmp = 
metadata.getColumnDefinitionNameComparator(c1.column.kind);
+                    int c = cmp.compare(c1.column.name.bytes, 
c2.column.name.bytes);
+                    if (c != 0)
+                        return c;
+                }
+
+                assert (c1.collectionElement == null) == (c2.collectionElement 
== null);
+
+                if (c1.collectionElement != null)
+                {
+                    AbstractType<?> colCmp = 
((CollectionType)c1.column.type).nameComparator();
+                    return colCmp.compare(c1.collectionElement, 
c2.collectionElement);
+                }
+                return 0;
+            }
+        };
+    }
+
+    public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInput in, 
boolean readAllAsDynamic) throws IOException
+    {
+        while (true)
+        {
+            ByteBuffer cellname = ByteBufferUtil.readWithShortLength(in);
+            if (!cellname.hasRemaining())
+                return null; // END_OF_ROW
+
+            try
+            {
+                int b = in.readUnsignedByte();
+                return (b & RANGE_TOMBSTONE_MASK) != 0
+                    ? readLegacyRangeTombstoneBody(metadata, in, cellname)
+                    : readLegacyCellBody(metadata, in, cellname, b, 
SerializationHelper.Flag.LOCAL, readAllAsDynamic);
+            }
+            catch (UnknownColumnException e)
+            {
+                // We can get there if we read a cell for a dropped column, 
and ff that is the case,
+                // then simply ignore the cell is fine. But also not that we 
ignore if it's the
+                // system keyspace because for those table we actually remove 
columns without registering
+                // them in the dropped columns
+                assert metadata.ksName.equals(SystemKeyspace.NAME) || 
metadata.getDroppedColumnDefinition(cellname) != null : e.getMessage();
+            }
+        }
+    }
+
+    public static LegacyCell readLegacyCell(CFMetaData metadata, DataInput in, 
SerializationHelper.Flag flag) throws IOException, UnknownColumnException
+    {
+        ByteBuffer cellname = ByteBufferUtil.readWithShortLength(in);
+        int b = in.readUnsignedByte();
+        return readLegacyCellBody(metadata, in, cellname, b, flag, false);
+    }
+
+    public static LegacyCell readLegacyCellBody(CFMetaData metadata, DataInput 
in, ByteBuffer cellname, int mask, SerializationHelper.Flag flag, boolean 
readAllAsDynamic)
+    throws IOException, UnknownColumnException
+    {
+        // Note that we want to call decodeCellName only after we've 
deserialized other parts, since it can throw
+        // and we want to throw only after having deserialized the full cell.
+        if ((mask & COUNTER_MASK) != 0)
+        {
+            in.readLong(); // timestampOfLastDelete: this has been unused for 
a long time so we ignore it
+            long ts = in.readLong();
+            ByteBuffer value = ByteBufferUtil.readWithLength(in);
+            if (flag == SerializationHelper.Flag.FROM_REMOTE || (flag == 
SerializationHelper.Flag.LOCAL && 
CounterContext.instance().shouldClearLocal(value)))
+                value = CounterContext.instance().clearAllLocal(value);
+            return new LegacyCell(LegacyCell.Kind.COUNTER, 
decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, 
LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL);
+        }
+        else if ((mask & EXPIRATION_MASK) != 0)
+        {
+            int ttl = in.readInt();
+            int expiration = in.readInt();
+            long ts = in.readLong();
+            ByteBuffer value = ByteBufferUtil.readWithLength(in);
+            return new LegacyCell(LegacyCell.Kind.EXPIRING, 
decodeCellName(metadata, cellname, readAllAsDynamic), value, ts, expiration, 
ttl);
+        }
+        else
+        {
+            long ts = in.readLong();
+            ByteBuffer value = ByteBufferUtil.readWithLength(in);
+            LegacyCellName name = decodeCellName(metadata, cellname, 
readAllAsDynamic);
+            return (mask & COUNTER_UPDATE_MASK) != 0
+                ? new LegacyCell(LegacyCell.Kind.COUNTER, name, 
CounterContext.instance().createLocal(ByteBufferUtil.toLong(value)), ts, 
LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL)
+                : ((mask & DELETION_MASK) == 0
+                        ? new LegacyCell(LegacyCell.Kind.REGULAR, name, value, 
ts, LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_TTL)
+                        : new LegacyCell(LegacyCell.Kind.DELETED, name, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, ts, ByteBufferUtil.toInt(value), 
LivenessInfo.NO_TTL));
+        }
+    }
+
+    public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData 
metadata, DataInput in) throws IOException
+    {
+        ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in);
+        in.readUnsignedByte();
+        return readLegacyRangeTombstoneBody(metadata, in, boundname);
+    }
+
+    public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData 
metadata, DataInput in, ByteBuffer boundname) throws IOException
+    {
+        LegacyBound min = decodeBound(metadata, boundname, true);
+        LegacyBound max = decodeBound(metadata, 
ByteBufferUtil.readWithShortLength(in), false);
+        DeletionTime dt = DeletionTime.serializer.deserialize(in);
+        return new LegacyRangeTombstone(min, max, dt);
+    }
+
+    public static Iterator<LegacyCell> deserializeCells(final CFMetaData 
metadata,
+                                                        final DataInput in,
+                                                        final 
SerializationHelper.Flag flag,
+                                                        final int size)
+    {
+        return new AbstractIterator<LegacyCell>()
+        {
+            private int i = 0;
+
+            protected LegacyCell computeNext()
+            {
+                if (i >= size)
+                    return endOfData();
+
+                ++i;
+                try
+                {
+                    return readLegacyCell(metadata, in, flag);
+                }
+                catch (UnknownColumnException e)
+                {
+                    // We can get there if we read a cell for a dropped 
column, and if that is the case,
+                    // then simply ignore the cell is fine. But also not that 
we ignore if it's the
+                    // system keyspace because for those table we actually 
remove columns without registering
+                    // them in the dropped columns
+                    if (metadata.ksName.equals(SystemKeyspace.NAME) || 
metadata.getDroppedColumnDefinition(e.columnName) != null)
+                        return computeNext();
+                    else
+                        throw new IOError(e);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        };
+    }
+
+    public static class CellGrouper
+    {
+        public final CFMetaData metadata;
+        private final ReusableRow row;
+        private final boolean isStatic;
+        private final SerializationHelper helper;
+        private Row.Writer writer;
+        private Clustering clustering;
+
+        private LegacyRangeTombstone rowDeletion;
+        private LegacyRangeTombstone collectionDeletion;
+
+        public CellGrouper(CFMetaData metadata, SerializationHelper helper)
+        {
+            this(metadata, helper, false);
+        }
+
+        private CellGrouper(CFMetaData metadata, SerializationHelper helper, 
boolean isStatic)
+        {
+            this.metadata = metadata;
+            this.isStatic = isStatic;
+            this.helper = helper;
+            this.row = isStatic ? null : new 
ReusableRow(metadata.clusteringColumns().size(), 
metadata.partitionColumns().regulars, false, metadata.isCounter());
+
+            if (isStatic)
+                this.writer = 
StaticRow.builder(metadata.partitionColumns().statics, false, 
metadata.isCounter());
+        }
+
+        public static CellGrouper staticGrouper(CFMetaData metadata, 
SerializationHelper helper)
+        {
+            return new CellGrouper(metadata, helper, true);
+        }
+
+        public void reset()
+        {
+            this.clustering = null;
+            this.rowDeletion = null;
+            this.collectionDeletion = null;
+
+            if (!isStatic)
+                this.writer = row.writer();
+        }
+
+        public boolean addAtom(LegacyAtom atom)
+        {
+            return atom.isCell()
+                 ? addCell(atom.asCell())
+                 : addRangeTombstone(atom.asRangeTombstone());
+        }
+
+        public boolean addCell(LegacyCell cell)
+        {
+            if (isStatic)
+            {
+                if (cell.name.clustering != Clustering.STATIC_CLUSTERING)
+                    return false;
+            }
+            else if (clustering == null)
+            {
+                clustering = cell.name.clustering.takeAlias();
+                clustering.writeTo(writer);
+            }
+            else if (!clustering.equals(cell.name.clustering))
+            {
+                return false;
+            }
+
+            // Ignore shadowed cells
+            if (rowDeletion != null && 
rowDeletion.deletionTime.deletes(cell.timestamp))
+                return true;
+
+            LivenessInfo info = livenessInfo(metadata, cell);
+
+            ColumnDefinition column = cell.name.column;
+            if (column == null)
+            {
+                // It's the row marker
+                assert !cell.value.hasRemaining();
+                helper.writePartitionKeyLivenessInfo(writer, info.timestamp(), 
info.ttl(), info.localDeletionTime());
+            }
+            else
+            {
+                if (collectionDeletion != null && 
collectionDeletion.start.collectionName.name.equals(column.name) && 
collectionDeletion.deletionTime.deletes(cell.timestamp))
+                    return true;
+
+                if (helper.includes(column))
+                {
+                    CellPath path = null;
+                    if (column.isComplex())
+                    {
+                        // Recalling startOfComplexColumn for every cell is a 
big inefficient, but it's ok in practice
+                        // and it's simpler. And since 1) this only matter for 
super column selection in thrift in
+                        // practice and 2) is only used during upgrade, it's 
probably worth keeping things simple.
+                        helper.startOfComplexColumn(column);
+                        path = cell.name.collectionElement == null ? null : 
CellPath.create(cell.name.collectionElement);
+                    }
+                    helper.writeCell(writer, column, cell.isCounter(), 
cell.value, info.timestamp(), info.localDeletionTime(), info.ttl(), path);
+                    if (column.isComplex())
+                    {
+                        helper.endOfComplexColumn(column);
+                    }
+                }
+            }
+            return true;
+        }
+
+        public boolean addRangeTombstone(LegacyRangeTombstone tombstone)
+        {
+            if (tombstone.isRowDeletion(metadata))
+            {
+                // If we're already within a row, it can't be the same one
+                if (clustering != null)
+                    return false;
+
+                clustering = 
tombstone.start.getAsClustering(metadata).takeAlias();
+                clustering.writeTo(writer);
+                writer.writeRowDeletion(tombstone.deletionTime);
+                rowDeletion = tombstone;
+                return true;
+            }
+
+            if (tombstone.isCollectionTombstone())
+            {
+                if (clustering == null)
+                {
+                    clustering = 
tombstone.start.getAsClustering(metadata).takeAlias();
+                    clustering.writeTo(writer);
+                }
+                else if 
(!clustering.equals(tombstone.start.getAsClustering(metadata)))
+                {
+                    return false;
+                }
+
+                writer.writeComplexDeletion(tombstone.start.collectionName, 
tombstone.deletionTime);
+                if (rowDeletion == null || 
tombstone.deletionTime.supersedes(rowDeletion.deletionTime))
+                    collectionDeletion = tombstone;
+                return true;
+            }
+            return false;
+        }
+
+        public Row getRow()
+        {
+            writer.endOfRow();
+            return isStatic ? ((StaticRow.Builder)writer).build() : row;
+        }
+    }
+
+    public static class LegacyCellName
+    {
+        public final Clustering clustering;
+        public final ColumnDefinition column;
+        public final ByteBuffer collectionElement;
+
+        private LegacyCellName(Clustering clustering, ColumnDefinition column, 
ByteBuffer collectionElement)
+        {
+            this.clustering = clustering;
+            this.column = column;
+            this.collectionElement = collectionElement;
+        }
+
+        public ByteBuffer encode(CFMetaData metadata)
+        {
+            return encodeCellName(metadata, clustering, column == null ? 
ByteBufferUtil.EMPTY_BYTE_BUFFER : column.name.bytes, collectionElement);
+        }
+
+        public ByteBuffer superColumnSubName()
+        {
+            assert collectionElement != null;
+            return collectionElement;
+        }
+
+        public ByteBuffer superColumnName()
+        {
+            return clustering.get(0);
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < clustering.size(); i++)
+                sb.append(i > 0 ? ":" : "").append(clustering.get(i) == null ? 
"null" : ByteBufferUtil.bytesToHex(clustering.get(i)));
+            return String.format("Cellname(clustering=%s, column=%s, 
collElt=%s)", sb.toString(), column == null ? "null" : column.name, 
collectionElement == null ? "null" : 
ByteBufferUtil.bytesToHex(collectionElement));
+        }
+    }
+
+    public static class LegacyBound
+    {
+        public static final LegacyBound BOTTOM = new 
LegacyBound(Slice.Bound.BOTTOM, false, null);
+        public static final LegacyBound TOP = new LegacyBound(Slice.Bound.TOP, 
false, null);
+
+        public final Slice.Bound bound;
+        public final boolean isStatic;
+        public final ColumnDefinition collectionName;
+
+        private LegacyBound(Slice.Bound bound, boolean isStatic, 
ColumnDefinition collectionName)
+        {
+            this.bound = bound;
+            this.isStatic = isStatic;
+            this.collectionName = collectionName;
+        }
+
+        public Clustering getAsClustering(CFMetaData metadata)
+        {
+            assert bound.size() == metadata.comparator.size();
+            ByteBuffer[] values = new ByteBuffer[bound.size()];
+            for (int i = 0; i < bound.size(); i++)
+                values[i] = bound.get(i);
+            return new SimpleClustering(values);
+        }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append(bound.kind()).append('(');
+            for (int i = 0; i < bound.size(); i++)
+                sb.append(i > 0 ? ":" : "").append(bound.get(i) == null ? 
"null" : ByteBufferUtil.bytesToHex(bound.get(i)));
+            sb.append(')');
+            return String.format("Bound(%s, collection=%s)", sb.toString(), 
collectionName == null ? "null" : collectionName.name);
+        }
+    }
+
+    public interface LegacyAtom extends Clusterable
+    {
+        public boolean isCell();
+
+        public ClusteringPrefix clustering();
+        public boolean isStatic();
+
+        public LegacyCell asCell();
+        public LegacyRangeTombstone asRangeTombstone();
+    }
+
+    /**
+     * A legacy cell.
+     * <p>
+     * This is used as a temporary object to facilitate dealing with the 
legacy format, this
+     * is not meant to be optimal.
+     */
+    public static class LegacyCell implements LegacyAtom
+    {
+        public enum Kind { REGULAR, EXPIRING, DELETED, COUNTER }
+
+        public final Kind kind;
+
+        public final LegacyCellName name;
+        public final ByteBuffer value;
+
+        public final long timestamp;
+        public final int localDeletionTime;
+        public final int ttl;
+
+        private LegacyCell(Kind kind, LegacyCellName name, ByteBuffer value, 
long timestamp, int localDeletionTime, int ttl)
+        {
+            this.kind = kind;
+            this.name = name;
+            this.value = value;
+            this.timestamp = timestamp;
+            this.localDeletionTime = localDeletionTime;
+            this.ttl = ttl;
+        }
+
+        public static LegacyCell regular(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, ByteBuffer value, long timestamp)
+        throws UnknownColumnException
+        {
+            return new LegacyCell(Kind.REGULAR, decodeCellName(metadata, 
superColumnName, name), value, timestamp, LivenessInfo.NO_DELETION_TIME, 
LivenessInfo.NO_TTL);
+        }
+
+        public static LegacyCell expiring(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, ByteBuffer value, long timestamp, int ttl, 
int nowInSec)
+        throws UnknownColumnException
+        {
+            return new LegacyCell(Kind.EXPIRING, decodeCellName(metadata, 
superColumnName, name), value, timestamp, nowInSec, ttl);
+        }
+
+        public static LegacyCell tombstone(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, long timestamp, int nowInSec)
+        throws UnknownColumnException
+        {
+            return new LegacyCell(Kind.DELETED, decodeCellName(metadata, 
superColumnName, name), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp, nowInSec, 
LivenessInfo.NO_TTL);
+        }
+
+        public static LegacyCell counter(CFMetaData metadata, ByteBuffer 
superColumnName, ByteBuffer name, long value)
+        throws UnknownColumnException
+        {
+            // See UpdateParameters.addCounter() for more details on this
+            ByteBuffer counterValue = 
CounterContext.instance().createLocal(value);
+            return counter(decodeCellName(metadata, superColumnName, name), 
counterValue);
+        }
+
+        public static LegacyCell counter(LegacyCellName name, ByteBuffer value)
+        {
+            return new LegacyCell(Kind.COUNTER, name, value, 
FBUtilities.timestampMicros(), LivenessInfo.NO_DELETION_TIME, 
LivenessInfo.NO_TTL);
+        }
+
+        public ClusteringPrefix clustering()
+        {
+            return name.clustering;
+        }
+
+        public boolean isStatic()
+        {
+            return name.clustering == Clustering.STATIC_CLUSTERING;
+        }
+
+        public boolean isCell()
+        {
+            return true;
+        }
+
+        public LegacyCell asCell()
+        {
+            return this;
+        }
+
+        public LegacyRangeTombstone asRangeTombstone()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isCounter()
+        {
+            return kind == Kind.COUNTER;
+        }
+
+        public boolean isExpiring()
+        {
+            return kind == Kind.EXPIRING;
+        }
+
+        public boolean isTombstone()
+        {
+            return kind == Kind.DELETED;
+        }
+
+        public boolean isLive(int nowInSec)
+        {
+            if (isTombstone())
+                return false;
+
+            if (isExpiring())
+                return nowInSec < localDeletionTime;
+
+            return true;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("LegacyCell(%s, name=%s, v=%s, ts=%s, ldt=%s, 
ttl=%s)", kind, name, ByteBufferUtil.bytesToHex(value), timestamp, 
localDeletionTime, ttl);
+        }
+    }
+
+    /**
+     * A legacy range tombstone.
+     * <p>
+     * This is used as a temporary object to facilitate dealing with the 
legacy format, this
+     * is not meant to be optimal.
+     */
+    public static class LegacyRangeTombstone implements LegacyAtom
+    {
+        public final LegacyBound start;
+        public final LegacyBound stop;
+        public final DeletionTime deletionTime;
+
+        // Do not use directly, use create() instead.
+        private LegacyRangeTombstone(LegacyBound start, LegacyBound stop, 
DeletionTime deletionTime)
+        {
+            // Because of the way RangeTombstoneList work, we can have a 
tombstone where only one of
+            // the bound has a collectionName. That happens if we have a big 
tombstone A (spanning one
+            // or multiple rows) and a collection tombstone B. In that case, 
RangeTombstoneList will
+            // split this into 3 RTs: the first one from the beginning of A to 
the beginning of B,
+            // then B, then a third one from the end of B to the end of A. To 
make this simpler, if
+            // we detect that case we transform the 1st and 3rd tombstone so 
they don't end in the middle
+            // of a row (which is still correct).
+            if ((start.collectionName == null) != (stop.collectionName == 
null))
+            {
+                if (start.collectionName == null)
+                    stop = new LegacyBound(stop.bound, stop.isStatic, null);
+                else
+                    start = new LegacyBound(start.bound, start.isStatic, null);
+            }
+            else if (!Objects.equals(start.collectionName, 
stop.collectionName))
+            {
+                // We're in the similar but slightly more complex case where 
on top of the big tombstone
+                // A, we have 2 (or more) collection tombstones B and C within 
A. So we also end up with
+                // a tombstone that goes between the end of B and the start of 
C.
+                start = new LegacyBound(start.bound, start.isStatic, null);
+                stop = new LegacyBound(stop.bound, stop.isStatic, null);
+            }
+
+            this.start = start;
+            this.stop = stop;
+            this.deletionTime = deletionTime;
+        }
+
+        public ClusteringPrefix clustering()
+        {
+            return start.bound;
+        }
+
+        public boolean isCell()
+        {
+            return false;
+        }
+
+        public boolean isStatic()
+        {
+            return start.isStatic;
+        }
+
+        public LegacyCell asCell()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public LegacyRangeTombstone asRangeTombstone()
+        {
+            return this;
+        }
+
+        public boolean isCollectionTombstone()
+        {
+            return start.collectionName != null;
+        }
+
+        public boolean isRowDeletion(CFMetaData metadata)
+        {
+            if (start.collectionName != null
+                || stop.collectionName != null
+                || start.bound.size() != metadata.comparator.size()
+                || stop.bound.size() != metadata.comparator.size())
+                return false;
+
+            for (int i = 0; i < start.bound.size(); i++)
+                if (!Objects.equals(start.bound.get(i), stop.bound.get(i)))
+                    return false;
+            return true;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("RT(%s-%s, %s)", start, stop, deletionTime);
+        }
+    }
+
+    public static class LegacyDeletionInfo
+    {
+        public static final Serializer serializer = new Serializer();
+
+        public final DeletionInfo deletionInfo;
+        private final List<LegacyRangeTombstone> inRowTombstones;
+
+        private LegacyDeletionInfo(DeletionInfo deletionInfo, 
List<LegacyRangeTombstone> inRowTombstones)
+        {
+            this.deletionInfo = deletionInfo;
+            this.inRowTombstones = inRowTombstones;
+        }
+
+        public static LegacyDeletionInfo from(DeletionInfo info)
+        {
+            return new LegacyDeletionInfo(info, 
Collections.<LegacyRangeTombstone>emptyList());
+        }
+
+        public static LegacyDeletionInfo live()
+        {
+            return from(DeletionInfo.live());
+        }
+
+        public Iterator<LegacyRangeTombstone> inRowRangeTombstones()
+        {
+            return inRowTombstones.iterator();
+        }
+
+        public static class Serializer
+        {
+            public void serialize(CFMetaData metadata, LegacyDeletionInfo 
info, DataOutputPlus out, int version) throws IOException
+            {
+                throw new UnsupportedOperationException();
+                //DeletionTime.serializer.serialize(info.topLevel, out);
+                //rtlSerializer.serialize(info.ranges, out, version);
+            }
+
+            public LegacyDeletionInfo deserialize(CFMetaData metadata, 
DataInput in, int version) throws IOException
+            {
+                DeletionTime topLevel = 
DeletionTime.serializer.deserialize(in);
+
+                int rangeCount = in.readInt();
+                if (rangeCount == 0)
+                    return from(new DeletionInfo(topLevel));
+
+                RangeTombstoneList ranges = new 
RangeTombstoneList(metadata.comparator, rangeCount);
+                List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
+                for (int i = 0; i < rangeCount; i++)
+                {
+                    LegacyBound start = decodeBound(metadata, 
ByteBufferUtil.readWithShortLength(in), true);
+                    LegacyBound end = decodeBound(metadata, 
ByteBufferUtil.readWithShortLength(in), false);
+                    int delTime =  in.readInt();
+                    long markedAt = in.readLong();
+
+                    LegacyRangeTombstone tombstone = new 
LegacyRangeTombstone(start, end, new SimpleDeletionTime(markedAt, delTime));
+                    if (tombstone.isCollectionTombstone() || 
tombstone.isRowDeletion(metadata))
+                        inRowTombsones.add(tombstone);
+                    else
+                        ranges.add(start.bound, end.bound, markedAt, delTime);
+                }
+                return new LegacyDeletionInfo(new DeletionInfo(topLevel, 
ranges), inRowTombsones);
+            }
+
+            public long serializedSize(CFMetaData metadata, LegacyDeletionInfo 
info, TypeSizes typeSizes, int version)
+            {
+                throw new UnsupportedOperationException();
+                //long size = 
DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
+                //return size + rtlSerializer.serializedSize(info.ranges, 
typeSizes, version);
+            }
+        }
+    }
+
+    public static class TombstoneTracker
+    {
+        private final CFMetaData metadata;
+        private final DeletionTime partitionDeletion;
+        private final List<LegacyRangeTombstone> openTombstones = new 
ArrayList<>();
+
+        public TombstoneTracker(CFMetaData metadata, DeletionTime 
partitionDeletion)
+        {
+            this.metadata = metadata;
+            this.partitionDeletion = partitionDeletion;
+        }
+
+        public void update(LegacyAtom atom)
+        {
+            if (atom.isCell())
+            {
+                if (openTombstones.isEmpty())
+                    return;
+
+                Iterator<LegacyRangeTombstone> iter = 
openTombstones.iterator();
+                while (iter.hasNext())
+                {
+                    LegacyRangeTombstone tombstone = iter.next();
+                    if (metadata.comparator.compare(atom, 
tombstone.stop.bound) >= 0)
+                        iter.remove();
+                }
+            }
+
+            LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+            if (tombstone.deletionTime.supersedes(partitionDeletion) && 
!tombstone.isRowDeletion(metadata) && !tombstone.isCollectionTombstone())
+                openTombstones.add(tombstone);
+        }
+
+        public boolean isShadowed(LegacyAtom atom)
+        {
+            long timestamp = atom.isCell() ? atom.asCell().timestamp : 
atom.asRangeTombstone().deletionTime.markedForDeleteAt();
+
+            if (partitionDeletion.deletes(timestamp))
+                return true;
+
+            for (LegacyRangeTombstone tombstone : openTombstones)
+            {
+                if (tombstone.deletionTime.deletes(timestamp))
+                    return true;
+            }
+
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java 
b/src/java/org/apache/cassandra/db/LivenessInfo.java
new file mode 100644
index 0000000..89971d1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.db.*;
+
+/**
+ * Groups the informations necessary to decide the liveness of a given piece of
+ * column data.
+ * <p>
+ * In practice, a {@code LivenessInfo} groups 3 informations:
+ *   1) the data timestamp. It is sometimes allowed for a given piece of data 
to have
+ *      no timestamp (for {@link Row#partitionKeyLivenessInfo} more 
precisely), but if that
+ *      is the case it means the data has no liveness info at all.
+ *   2) the data ttl if relevant.
+ *   2) the data local deletion time if relevant (that is, if either the data 
has a ttl or is deleted).
+ */
+public interface LivenessInfo extends Aliasable<LivenessInfo>
+{
+    public static final long NO_TIMESTAMP = Long.MIN_VALUE;
+    public static final int NO_TTL = 0;
+    public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
+
+    public static final LivenessInfo NONE = new 
SimpleLivenessInfo(NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME);
+
+    /**
+     * The timestamp at which the data was inserted or {@link NO_TIMESTAMP}
+     * if it has no timestamp (which may or may not be allowed).
+     *
+     * @return the liveness info timestamp.
+     */
+    public long timestamp();
+
+    /**
+     * Whether this liveness info has a timestamp or not.
+     * <p>
+     * Note that if this return {@code false}, then both {@link #hasTTL} and
+     * {@link #hasLocalDeletionTime} must return {@code false} too.
+     *
+     * @return whether this liveness info has a timestamp or not.
+     */
+    public boolean hasTimestamp();
+
+    /**
+     * The ttl (if any) on the data or {@link NO_TTL} if the data is not
+     * expiring.
+     *
+     * Please note that this value is the TTL that was set originally and is 
thus not
+     * changing. If you want to figure out how much time the data has before 
it expires,
+     * then you should use {@link #remainingTTL}.
+     */
+    public int ttl();
+
+    /**
+     * Whether this liveness info has a TTL or not.
+     *
+     * @return whether this liveness info has a TTL or not.
+     */
+    public boolean hasTTL();
+
+    /**
+     * The deletion time (in seconds) on the data if applicable ({@link 
NO_DELETION}
+     * otherwise).
+     *
+     * There is 3 cases in practice:
+     *   1) the data is neither deleted nor expiring: it then has neither 
{@code ttl()}
+     *      nor {@code localDeletionTime()}.
+     *   2) the data is expiring/expired: it then has both a {@code ttl()} and 
a
+     *      {@code localDeletionTime()}. Whether it's still live or is expired 
depends
+     *      on the {@code localDeletionTime()}.
+     *   3) the data is deleted: it has no {@code ttl()} but has a
+     *      {@code localDeletionTime()}.
+     */
+    public int localDeletionTime();
+
+    /**
+     * Whether this liveness info has a local deletion time or not.
+     *
+     * @return whether this liveness info has a local deletion time or not.
+     */
+    public boolean hasLocalDeletionTime();
+
+    /**
+     * The actual remaining time to live (in seconds) for the data this is
+     * the liveness information of.
+     *
+     * {@code #ttl} returns the initial TTL sets on the piece of data while 
this
+     * method computes how much time the data actually has to live given the
+     * current time.
+     *
+     * @param nowInSec the current time in seconds.
+     * @return the remaining time to live (in seconds) the data has, or
+     * {@code -1} if the data is either expired or not expiring.
+     */
+    public int remainingTTL(int nowInSec);
+
+    /**
+     * Checks whether a given piece of data is live given the current time.
+     *
+     * @param nowInSec the current time in seconds.
+     * @return whether the data having this liveness info is live or not.
+     */
+    public boolean isLive(int nowInSec);
+
+    /**
+     * Adds this liveness information to the provided digest.
+     *
+     * @param digest the digest to add this liveness information to.
+     */
+    public void digest(MessageDigest digest);
+
+    /**
+     * Validate the data contained by this liveness information.
+     *
+     * @throws MarshalException if some of the data is corrupted.
+     */
+    public void validate();
+
+    /**
+     * The size of the (useful) data this liveness information contains.
+     *
+     * @return the size of the data this liveness information contains.
+     */
+    public int dataSize();
+
+    /**
+     * Whether this liveness information supersedes another one (that is
+     * whether is has a greater timestamp than the other or not).
+     *
+     * @param other the {@code LivenessInfo} to compare this info to.
+     *
+     * @return whether this {@code LivenessInfo} supersedes {@code other}.
+     */
+    public boolean supersedes(LivenessInfo other);
+
+    /**
+     * Returns the result of merging this info to another one (that is, it
+     * return this info if it supersedes the other one, or the other one
+     * otherwise).
+     */
+    public LivenessInfo mergeWith(LivenessInfo other);
+
+    /**
+     * Whether this liveness information can be purged.
+     * <p>
+     * A liveness info can be purged if it is not live and hasn't been so
+     * for longer than gcGrace (or more precisely, it's local deletion time
+     * is smaller than gcBefore, which is itself "now - gcGrace").
+     *
+     * @param maxPurgeableTimestamp the biggest timestamp that can be purged.
+     * A liveness info will not be considered purgeable if its timestamp is
+     * greater than this value, even if it mets the other criteria for purging.
+     * @param gcBefore the local deletion time before which deleted/expired
+     * liveness info can be purged.
+     *
+     * @return whether this liveness information can be purged.
+     */
+    public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore);
+
+    /**
+     * Returns a copy of this liveness info updated with the provided 
timestamp.
+     *
+     * @param newTimestamp the timestamp for the returned info.
+     * @return if this liveness info has a timestamp, a copy of it with {@code 
newTimestamp}
+     * as timestamp. If it has no timestamp however, this liveness info is 
returned
+     * unchanged.
+     */
+    public LivenessInfo withUpdatedTimestamp(long newTimestamp);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/LivenessInfoArray.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfoArray.java 
b/src/java/org/apache/cassandra/db/LivenessInfoArray.java
new file mode 100644
index 0000000..24026d8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/LivenessInfoArray.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Arrays;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Utility class to store an array of liveness info efficiently.
+ */
+public class LivenessInfoArray
+{
+    private long[] timestamps;
+    private int[] delTimesAndTTLs;
+
+    public LivenessInfoArray(int initialCapacity)
+    {
+        this.timestamps = new long[initialCapacity];
+        this.delTimesAndTTLs = new int[initialCapacity * 2];
+        clear();
+    }
+
+    public void clear(int i)
+    {
+        timestamps[i] = LivenessInfo.NO_TIMESTAMP;
+        delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME;
+        delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL;
+    }
+
+    public void set(int i, LivenessInfo info)
+    {
+        set(i, info.timestamp(), info.ttl(), info.localDeletionTime());
+    }
+
+    public void set(int i, long timestamp, int ttl, int localDeletionTime)
+    {
+        this.timestamps[i] = timestamp;
+        this.delTimesAndTTLs[2 * i] = localDeletionTime;
+        this.delTimesAndTTLs[2 * i + 1] = ttl;
+    }
+
+    public long timestamp(int i)
+    {
+        return timestamps[i];
+    }
+
+    public int localDeletionTime(int i)
+    {
+        return delTimesAndTTLs[2 * i];
+    }
+
+    public int ttl(int i)
+    {
+        return delTimesAndTTLs[2 * i + 1];
+    }
+
+    public boolean isLive(int i, int nowInSec)
+    {
+        // See AbstractLivenessInfo.isLive().
+        return localDeletionTime(i) == LivenessInfo.NO_DELETION_TIME
+            || (ttl(i) != LivenessInfo.NO_TTL && nowInSec < 
localDeletionTime(i));
+    }
+
+    public int size()
+    {
+        return timestamps.length;
+    }
+
+    public void resize(int newSize)
+    {
+        int prevSize = size();
+
+        timestamps = Arrays.copyOf(timestamps, newSize);
+        delTimesAndTTLs = Arrays.copyOf(delTimesAndTTLs, newSize * 2);
+
+        clear(prevSize, newSize);
+    }
+
+    public void swap(int i, int j)
+    {
+        long ts = timestamps[j];
+        int ldt = delTimesAndTTLs[2 * j];
+        int ttl = delTimesAndTTLs[2 * j + 1];
+
+        move(i, j);
+
+        timestamps[i] = ts;
+        delTimesAndTTLs[2 * i] = ldt;
+        delTimesAndTTLs[2 * i + 1] = ttl;
+    }
+
+    public void move(int i, int j)
+    {
+        timestamps[j] = timestamps[i];
+        delTimesAndTTLs[2 * j] = delTimesAndTTLs[2 * i];
+        delTimesAndTTLs[2 * j + 1] = delTimesAndTTLs[2 * i + 1];
+    }
+
+    public void clear()
+    {
+        clear(0, size());
+    }
+
+    private void clear(int from, int to)
+    {
+        Arrays.fill(timestamps, from, to, LivenessInfo.NO_TIMESTAMP);
+        for (int i = from; i < to; i++)
+        {
+            delTimesAndTTLs[2 * i] = LivenessInfo.NO_DELETION_TIME;
+            delTimesAndTTLs[2 * i + 1] = LivenessInfo.NO_TTL;
+        }
+    }
+
+    public int dataSize()
+    {
+        return 16 * size();
+    }
+
+    public long unsharedHeapSize()
+    {
+        return ObjectSizes.sizeOfArray(timestamps)
+             + ObjectSizes.sizeOfArray(delTimesAndTTLs);
+    }
+
+    public static Cursor newCursor()
+    {
+        return new Cursor();
+    }
+
+    public static class Cursor extends AbstractLivenessInfo
+    {
+        private LivenessInfoArray array;
+        private int i;
+
+        public Cursor setTo(LivenessInfoArray array, int i)
+        {
+            this.array = array;
+            this.i = i;
+            return this;
+        }
+
+        public long timestamp()
+        {
+            return array.timestamps[i];
+        }
+
+        public int localDeletionTime()
+        {
+            return array.delTimesAndTTLs[2 * i];
+        }
+
+        public int ttl()
+        {
+            return array.delTimesAndTTLs[2 * i + 1];
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java 
b/src/java/org/apache/cassandra/db/Memtable.java
index ccf92be..e82c35e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -18,30 +18,33 @@
 package org.apache.cassandra.db;
 
 import java.io.File;
-import java.util.AbstractMap;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -79,10 +82,10 @@ public class Memtable implements Comparable<Memtable>
         }
     }
 
-    // We index the memtable by RowPosition only for the purpose of being able
+    // We index the memtable by PartitionPosition only for the purpose of 
being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
-    private final ConcurrentNavigableMap<RowPosition, AtomicBTreeColumns> rows 
= new ConcurrentSkipListMap<>();
+    private final ConcurrentNavigableMap<PartitionPosition, 
AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
     public final ColumnFamilyStore cfs;
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
@@ -90,7 +93,10 @@ public class Memtable implements Comparable<Memtable>
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
-    public final CellNameType initialComparator;
+    public final ClusteringComparator initialComparator;
+
+    private final ColumnsCollector columnsCollector;
+    private final StatsCollector statsCollector = new StatsCollector();
 
     public Memtable(ColumnFamilyStore cfs)
     {
@@ -98,6 +104,7 @@ public class Memtable implements Comparable<Memtable>
         this.allocator = MEMORY_POOL.newAllocator();
         this.initialComparator = cfs.metadata.comparator;
         this.cfs.scheduleFlush();
+        this.columnsCollector = new 
ColumnsCollector(cfs.metadata.partitionColumns());
     }
 
     // ONLY to be used for testing, to create a mock Memtable
@@ -107,6 +114,7 @@ public class Memtable implements Comparable<Memtable>
         this.initialComparator = metadata.comparator;
         this.cfs = null;
         this.allocator = null;
+        this.columnsCollector = new 
ColumnsCollector(metadata.partitionColumns());
     }
 
     public MemtableAllocator getAllocator()
@@ -175,7 +183,7 @@ public class Memtable implements Comparable<Memtable>
 
     public boolean isClean()
     {
-        return rows.isEmpty();
+        return partitions.isEmpty();
     }
 
     public boolean isCleanAfter(ReplayPosition position)
@@ -198,23 +206,23 @@ public class Memtable implements Comparable<Memtable>
      *
      * replayPosition should only be null if this is a secondary index, in 
which case it is *expected* to be null
      */
-    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater 
indexer, OpOrder.Group opGroup)
+    long put(PartitionUpdate update, SecondaryIndexManager.Updater indexer, 
OpOrder.Group opGroup)
     {
-        AtomicBTreeColumns previous = rows.get(key);
+        AtomicBTreePartition previous = partitions.get(update.partitionKey());
 
         long initialSize = 0;
         if (previous == null)
         {
-            AtomicBTreeColumns empty = 
cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
-            final DecoratedKey cloneKey = allocator.clone(key, opGroup);
+            final DecoratedKey cloneKey = 
allocator.clone(update.partitionKey(), opGroup);
+            AtomicBTreePartition empty = new 
AtomicBTreePartition(cfs.metadata, cloneKey, allocator);
             // We'll add the columns later. This avoids wasting works if we 
get beaten in the putIfAbsent
-            previous = rows.putIfAbsent(cloneKey, empty);
+            previous = partitions.putIfAbsent(cloneKey, empty);
             if (previous == null)
             {
                 previous = empty;
                 // allocate the row overhead after the fact; this saves over 
allocating and having to free after, but
                 // means we can overshoot our declared limit.
-                int overhead = (int) (key.getToken().getHeapSize() + 
ROW_OVERHEAD_HEAP_SIZE);
+                int overhead = (int) (cloneKey.getToken().getHeapSize() + 
ROW_OVERHEAD_HEAP_SIZE);
                 allocator.onHeap().allocate(overhead, opGroup);
                 initialSize = 8;
             }
@@ -224,28 +232,17 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, 
allocator, opGroup, indexer);
-        liveDataSize.addAndGet(initialSize + pair.left);
-        currentOperations.addAndGet(cf.getColumnCount() + 
(cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-        return pair.right;
-    }
-
-    // for debugging
-    public String contents()
-    {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : 
rows.entrySet())
-        {
-            builder.append(entry.getKey()).append(": 
").append(entry.getValue()).append(", ");
-        }
-        builder.append("}");
-        return builder.toString();
+        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
+        liveDataSize.addAndGet(initialSize + pair[0]);
+        columnsCollector.update(update.columns());
+        statsCollector.update(update.stats());
+        currentOperations.addAndGet(update.operationCount());
+        return pair[1];
     }
 
     public int partitionCount()
     {
-        return rows.size();
+        return partitions.size();
     }
 
     public FlushRunnable flushRunnable()
@@ -259,55 +256,53 @@ public class Memtable implements Comparable<Memtable>
                              cfs.name, hashCode(), liveDataSize, 
currentOperations, 100 * allocator.onHeap().ownershipRatio(), 100 * 
allocator.offHeap().ownershipRatio());
     }
 
-    /**
-     * @param startWith Include data in the result from and including this key 
and to the end of the memtable
-     * @return An iterator of entries with the data from the start key
-     */
-    public Iterator<Map.Entry<DecoratedKey, ColumnFamily>> 
getEntryIterator(final RowPosition startWith, final RowPosition stopAt)
+    public UnfilteredPartitionIterator makePartitionIterator(final 
ColumnFilter columnFilter, final DataRange dataRange, final boolean isForThrift)
     {
-        return new Iterator<Map.Entry<DecoratedKey, ColumnFamily>>()
-        {
-            private Iterator<? extends Map.Entry<? extends RowPosition, 
AtomicBTreeColumns>> iter = stopAt.isMinimum()
-                    ? rows.tailMap(startWith).entrySet().iterator()
-                    : rows.subMap(startWith, true, stopAt, 
true).entrySet().iterator();
+        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+
+        boolean startIsMin = keyRange.left.isMinimum();
+        boolean stopIsMin = keyRange.right.isMinimum();
+
+        boolean isBound = keyRange instanceof Bounds;
+        boolean includeStart = isBound || keyRange instanceof 
IncludingExcludingBounds;
+        boolean includeStop = isBound || keyRange instanceof Range;
+        Map<PartitionPosition, AtomicBTreePartition> subMap;
+        if (startIsMin)
+            subMap = stopIsMin ? partitions : 
partitions.headMap(keyRange.right, includeStop);
+        else
+            subMap = stopIsMin
+                   ? partitions.tailMap(keyRange.left, includeStart)
+                   : partitions.subMap(keyRange.left, includeStart, 
keyRange.right, includeStop);
+
+        final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> 
iter = subMap.entrySet().iterator();
 
-            private Map.Entry<? extends RowPosition, ? extends ColumnFamily> 
currentEntry;
+        return new AbstractUnfilteredPartitionIterator()
+        {
+            public boolean isForThrift()
+            {
+                return isForThrift;
+            }
 
             public boolean hasNext()
             {
                 return iter.hasNext();
             }
 
-            public Map.Entry<DecoratedKey, ColumnFamily> next()
+            public UnfilteredRowIterator next()
             {
-                Map.Entry<? extends RowPosition, ? extends ColumnFamily> entry 
= iter.next();
+                Map.Entry<PartitionPosition, AtomicBTreePartition> entry = 
iter.next();
                 // Actual stored key should be true DecoratedKey
                 assert entry.getKey() instanceof DecoratedKey;
-                if (MEMORY_POOL.needToCopyOnHeap())
-                {
-                    DecoratedKey key = (DecoratedKey) entry.getKey();
-                    key = new BufferDecoratedKey(key.getToken(), 
HeapAllocator.instance.clone(key.getKey()));
-                    ColumnFamily cells = 
ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
-                    entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
-                }
-                // Store the reference to the current entry so that remove() 
can update the current size.
-                currentEntry = entry;
-                // Object cast is required since otherwise we can't turn 
RowPosition into DecoratedKey
-                return (Map.Entry<DecoratedKey, ColumnFamily>) entry;
-            }
-
-            public void remove()
-            {
-                iter.remove();
-                liveDataSize.addAndGet(-currentEntry.getValue().dataSize());
-                currentEntry = null;
+                DecoratedKey key = (DecoratedKey)entry.getKey();
+                ClusteringIndexFilter filter = 
dataRange.clusteringIndexFilter(key);
+                return filter.getUnfilteredRowIterator(columnFilter, 
entry.getValue());
             }
         };
     }
 
-    public ColumnFamily getColumnFamily(DecoratedKey key)
+    public Partition getPartition(DecoratedKey key)
     {
-        return rows.get(key);
+        return partitions.get(key);
     }
 
     public long creationTime()
@@ -320,12 +315,14 @@ public class Memtable implements Comparable<Memtable>
         private final ReplayPosition context;
         private final long estimatedSize;
 
+        private final boolean isBatchLogTable;
+
         FlushRunnable(ReplayPosition context)
         {
             this.context = context;
 
             long keySize = 0;
-            for (RowPosition key : rows.keySet())
+            for (PartitionPosition key : partitions.keySet())
             {
                 //  make sure we don't write non-sensical keys
                 assert key instanceof DecoratedKey;
@@ -335,6 +332,8 @@ public class Memtable implements Comparable<Memtable>
                                     + keySize // keys in data file
                                     + liveDataSize.get()) // data
                                     * 1.2); // bloom filter and row index 
overhead
+
+            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHLOG) && 
cfs.keyspace.getName().equals(SystemKeyspace.NAME);
         }
 
         public long getExpectedWriteSize()
@@ -363,32 +362,32 @@ public class Memtable implements Comparable<Memtable>
 
             SSTableReader ssTable;
             // errors when creating the writer that may leave empty temp files.
-            try (SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
+            try (SSTableWriter writer = 
createFlushWriter(cfs.getTempSSTablePath(sstableDirectory), 
columnsCollector.get(), statsCollector.get()))
             {
                 boolean trackContention = logger.isDebugEnabled();
                 int heavilyContendedRowCount = 0;
                 // (we can't clear out the map as-we-go to free up memory,
                 //  since the memtable is being used for queries in the 
"pending flush" category)
-                for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : 
rows.entrySet())
+                for (AtomicBTreePartition partition : partitions.values())
                 {
-                    AtomicBTreeColumns cf = entry.getValue();
+                    // Each batchlog partition is a separate entry in the log. 
And for an entry, we only do 2
+                    // operations: 1) we insert the entry and 2) we delete it. 
Further, BL data is strictly local,
+                    // we don't need to preserve tombstones for repair. So if 
both operation are in this
+                    // memtable (which will almost always be the case if there 
is no ongoing failure), we can
+                    // just skip the entry (CASSANDRA-4667).
+                    if (isBatchLogTable && 
!partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                        continue;
+
+                    if (trackContention && partition.usePessimisticLocking())
+                        heavilyContendedRowCount++;
 
-                    if (cf.isMarkedForDelete() && cf.hasColumns())
+                    if (!partition.isEmpty())
                     {
-                        // When every node is up, there's no reason to write 
batchlog data out to sstables
-                        // (which in turn incurs cost like compaction) since 
the BL write + delete cancel each other out,
-                        // and BL data is strictly local, so we don't need to 
preserve tombstones for repair.
-                        // If we have a data row + row level tombstone, then 
writing it is effectively an expensive no-op so we skip it.
-                        // See CASSANDRA-4667.
-                        if (cfs.name.equals(SystemKeyspace.BATCHLOG) && 
cfs.keyspace.getName().equals(SystemKeyspace.NAME))
-                            continue;
+                        try (UnfilteredRowIterator iter = 
partition.unfilteredIterator())
+                        {
+                            writer.append(iter);
+                        }
                     }
-
-                    if (trackContention && cf.usePessimisticLocking())
-                        heavilyContendedRowCount++;
-
-                    if (!cf.isEmpty())
-                        writer.append((DecoratedKey)entry.getKey(), cf);
                 }
 
                 if (writer.getFilePointer() > 0)
@@ -406,17 +405,24 @@ public class Memtable implements Comparable<Memtable>
                 }
 
                 if (heavilyContendedRowCount > 0)
-                    logger.debug(String.format("High update contention in 
%d/%d partitions of %s ", heavilyContendedRowCount, rows.size(), 
Memtable.this.toString()));
+                    logger.debug(String.format("High update contention in 
%d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), 
Memtable.this.toString()));
 
                 return ssTable;
             }
         }
 
-        public SSTableWriter createFlushWriter(String filename)
+        public SSTableWriter createFlushWriter(String filename,
+                                               PartitionColumns columns,
+                                               RowStats stats)
         {
             MetadataCollector sstableMetadataCollector = new 
MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-
-            return SSTableWriter.create(Descriptor.fromFilename(filename), 
(long) rows.size(), ActiveRepairService.UNREPAIRED_SSTABLE, cfs.metadata, 
cfs.partitioner, sstableMetadataCollector);
+            return SSTableWriter.create(Descriptor.fromFilename(filename),
+                                        (long)partitions.size(),
+                                        ActiveRepairService.UNREPAIRED_SSTABLE,
+                                        cfs.metadata,
+                                        cfs.partitioner,
+                                        sstableMetadataCollector,
+                                        new SerializationHeader(cfs.metadata, 
columns, stats));
         }
     }
 
@@ -427,17 +433,82 @@ public class Memtable implements Comparable<Memtable>
         {
             int rowOverhead;
             MemtableAllocator allocator = MEMORY_POOL.newAllocator();
-            ConcurrentNavigableMap<RowPosition, Object> rows = new 
ConcurrentSkipListMap<>();
+            ConcurrentNavigableMap<PartitionPosition, Object> partitions = new 
ConcurrentSkipListMap<>();
             final Object val = new Object();
-            for (int i = 0; i < count; i++)
-                rows.put(allocator.clone(new BufferDecoratedKey(new 
LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
-            double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
+            for (int i = 0 ; i < count ; i++)
+                partitions.put(allocator.clone(new BufferDecoratedKey(new 
LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
+            double avgSize = ObjectSizes.measureDeep(partitions) / (double) 
count;
             rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? 
Math.floor(avgSize) : Math.ceil(avgSize));
             rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
-            rowOverhead += AtomicBTreeColumns.EMPTY_SIZE;
+            rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
             allocator.setDiscarding();
             allocator.setDiscarded();
             return rowOverhead;
         }
     }
+
+    private static class ColumnsCollector
+    {
+        private final HashMap<ColumnDefinition, AtomicBoolean> predefined = 
new HashMap<>();
+        private final ConcurrentSkipListSet<ColumnDefinition> extra = new 
ConcurrentSkipListSet<>();
+        ColumnsCollector(PartitionColumns columns)
+        {
+            for (ColumnDefinition def : columns.statics)
+                predefined.put(def, new AtomicBoolean());
+            for (ColumnDefinition def : columns.regulars)
+                predefined.put(def, new AtomicBoolean());
+        }
+
+        public void update(PartitionColumns columns)
+        {
+            for (ColumnDefinition s : columns.statics)
+                update(s);
+            for (ColumnDefinition r : columns.regulars)
+                update(r);
+        }
+
+        private void update(ColumnDefinition definition)
+        {
+            AtomicBoolean present = predefined.get(definition);
+            if (present != null)
+            {
+                if (!present.get())
+                    present.set(true);
+            }
+            else
+            {
+                extra.add(definition);
+            }
+        }
+
+        public PartitionColumns get()
+        {
+            PartitionColumns.Builder builder = PartitionColumns.builder();
+            for (Map.Entry<ColumnDefinition, AtomicBoolean> e : 
predefined.entrySet())
+                if (e.getValue().get())
+                    builder.add(e.getKey());
+            return builder.addAll(extra).build();
+        }
+    }
+
+    private static class StatsCollector
+    {
+        private final AtomicReference<RowStats> stats = new 
AtomicReference<>(RowStats.NO_STATS);
+
+        public void update(RowStats newStats)
+        {
+            while (true)
+            {
+                RowStats current = stats.get();
+                RowStats updated = current.mergeWith(newStats);
+                if (stats.compareAndSet(current, updated))
+                    return;
+            }
+        }
+
+        public RowStats get()
+        {
+            return stats.get();
+        }
+    }
 }

Reply via email to